DigiStorage pentru Developeri

"""
Digi Storage API - Python Client  (Python 3.8+)
=================================================
Covers: authentication, file listing, upload, download,
        move/copy/delete, folder creation, and shared link management.

Requirements:
    pip install requests

Usage:
    client = DigiStorageClient("your@email.ro", "yourpassword")
    client.login()
    mount_id = client.get_primary_mount_id()

    client.upload_file(mount_id, "/path/to/local/file.txt", "/remote/folder/")
    client.download_file(mount_id, "/remote/folder/file.txt", "/local/dest/")
    link = client.create_shared_link(mount_id, "/remote/folder/file.txt")
    print(link["url"])
"""

import os
import requests
from typing import Dict, List, Optional


BASE_URL = "https://storage.rcs-rds.ro"
#API_URL  = format(BASE_URL)."/api/v2"
TOKEN_URL = "https://storage.rcs-rds.ro/token"

API_URL = "https://storage.rcs-rds.ro/api/v2.1"


# ---------------------------------------------------------------------------
# Client class
# ---------------------------------------------------------------------------

class DigiStorageClient:
    """Thin wrapper around the Digi Storage REST API v2."""

    def __init__(self, email: str, password: str):
        self.email    = email
        self.password = password
        self.token    = None  # type: Optional[str]
        self.session  = requests.Session()
        self.session.headers.update({"Accept": "application/json"})

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _auth_headers(self) -> Dict[str, str]:
        if not self.token:
            raise RuntimeError("Not authenticated. Call login() first.")
        return {"Authorization": 'Token token="{}"'.format(self.token)}
    def _get(self, path: str, **kwargs) -> requests.Response:
        return self.session.get(
            "{}{}".format(API_URL, path),
            headers=self._auth_headers(),
            **kwargs
        )

    def _post(self, path: str, **kwargs) -> requests.Response:
        return self.session.post(
            "{}{}".format(API_URL, path),
            headers=self._auth_headers(),
            **kwargs
        )

    def _put(self, path: str, **kwargs) -> requests.Response:
        return self.session.put(
            "{}{}".format(API_URL, path),
            headers=self._auth_headers(),
            **kwargs
        )

    def _delete(self, path: str, **kwargs) -> requests.Response:
        return self.session.delete(
            "{}{}".format(TOKEN_URL, path),
            headers=self._auth_headers(),
            **kwargs
        )

    @staticmethod
    def _raise(resp: requests.Response):
        """Raise a readable error on non-2xx responses."""
        try:
            resp.raise_for_status()
        except requests.HTTPError as exc:
            raise requests.HTTPError(
                "{} — body: {}".format(exc, resp.text[:300]),
                response=resp
            ) from exc

    # ------------------------------------------------------------------
    # 1. Authentication
    # ------------------------------------------------------------------

    def login(self) -> str:
        """Authenticate and store the session token. Returns the token."""
        resp = self.session.post(
            "https://storage.rcs-rds.ro/token",
            json={"email": self.email, "password": self.password},
        )
        self._raise(resp)
        self.token = resp.json()["token"]
        print("[+] Logged in as {}".format(self.email))
        return self.token

    def logout(self):
        """Revoke the current session token."""
        resp = self._delete("")
        self._raise(resp)
        print("[+] Logged out.")
        self.token = None

    # ------------------------------------------------------------------
    # 2. User
    # ------------------------------------------------------------------

    def get_user_info(self) -> Dict:
        """Return basic profile information for the authenticated user."""
        resp = self._get("/user")
        self._raise(resp)
        return resp.json()

    # ------------------------------------------------------------------
    # 8. Mounts
    # ------------------------------------------------------------------

    def list_mounts(self, mount_type: Optional[str] = None) -> List[Dict]:
        """
        List all mounts (storage entrypoints).
        mount_type - optional filter: 'device', 'export', or 'import'.
        """
        params = {}
        if mount_type:
            params["type"] = mount_type
        resp = self._get("/mounts", params=params)
        self._raise(resp)
        return resp.json()["mounts"]

    def get_primary_mount_id(self) -> str:
        """
        Convenience: return the ID of the first 'device' mount
        (i.e. your personal Digi Storage space).
        """
        mounts = self.list_mounts(mount_type="device")
        if not mounts:
            raise RuntimeError("No device mounts found for this account.")
        mount = mounts[0]
        print("[+] Using mount: {}  (id={})".format(mount["name"], mount["id"]))
        return mount["id"]

    # ------------------------------------------------------------------
    # 9. Files
    # ------------------------------------------------------------------

    def list_files(self, mount_id: str, remote_path: str = "/") -> List[Dict]:
        """List files and folders at *remote_path* on *mount_id*."""
        resp = self._get(
            "/mounts/{}/files/list".format(mount_id),
            params={"path": remote_path}
        )
        self._raise(resp)
        return resp.json()["files"]

    def file_info(self, mount_id: str, remote_path: str) -> Dict:
        """Return metadata for a single file or folder."""
        resp = self._get(
            "/mounts/{}/files/info".format(mount_id),
            params={"path": remote_path}
        )
        self._raise(resp)
        return resp.json()

    def create_folder(self, mount_id: str, parent_path: str,
                      folder_name: str) -> None:
        """Create a new folder *folder_name* inside *parent_path*."""
        resp = self._post(
            "/mounts/{}/files/folder".format(mount_id),
            params={"path": parent_path},
            json={"name": folder_name},
        )
        self._raise(resp)
        print("[+] Folder created: {}/{}".format(
            parent_path.rstrip("/"), folder_name))

    def upload_file(self, mount_id: str, local_path: str,
                    remote_folder: str = "/") -> Dict:
        """
        Upload a local file to *remote_folder* on the given mount.
        Returns the JSON response from the blob endpoint.
        """
        if not os.path.isfile(local_path):
            raise FileNotFoundError(
                "Local file not found: {}".format(local_path))

        # Step 1 - obtain a one-time upload URL
        resp = self._get(
            "/mounts/{}/files/upload".format(mount_id),
            params={"path": remote_folder}
        )
        self._raise(resp)
        upload_url = resp.json()["link"]

        # Step 2 - POST the file as multipart/form-data (no auth header needed)
        filename = os.path.basename(local_path)
        with open(local_path, "rb") as fh:
            upload_resp = self.session.post(
                upload_url,
                files={"file": (filename, fh)},
                headers={},   # no Auth header for blob endpoint
            )
        self._raise(upload_resp)
        result = upload_resp.json()
        print("[+] Uploaded '{}' -> {}".format(filename, remote_folder))
        return result

    def download_file(self, mount_id: str, remote_path: str,
                      local_dir: str = ".") -> str:
        """
        Download a file from *remote_path* and save it to *local_dir*.
        Returns the local destination path.
        """
        # Step 1 - get the direct download link
        resp = self._get(
            "/mounts/{}/files/download".format(mount_id),
            params={"path": remote_path}
        )
        self._raise(resp)
        download_url = resp.json()["link"]

        # Step 2 - fetch the actual bytes (no auth needed)
        dl_resp = self.session.get(download_url, stream=True)
        self._raise(dl_resp)

        filename   = os.path.basename(remote_path)
        local_path = os.path.join(local_dir, filename)
        os.makedirs(local_dir, exist_ok=True)
        with open(local_path, "wb") as fh:
            for chunk in dl_resp.iter_content(chunk_size=8192):
                fh.write(chunk)

        print("[+] Downloaded '{}' -> {}".format(remote_path, local_path))
        return local_path

    def move_file(self, mount_id: str, from_path: str,
                  to_mount_id: str, to_path: str) -> Dict:
        """Move or rename a file/folder."""
        resp = self._put(
            "/mounts/{}/files/move".format(mount_id),
            params={"path": from_path},
            json={"toMountId": to_mount_id, "toPath": to_path},
        )
        self._raise(resp)
        result = resp.json()
        print("[+] Moved '{}' -> '{}'".format(from_path, to_path))
        return result

    def copy_file(self, mount_id: str, from_path: str,
                  to_mount_id: str, to_path: str) -> Dict:
        """Copy a file/folder to a new location."""
        resp = self._put(
            "/mounts/{}/files/copy".format(mount_id),
            params={"path": from_path},
            json={"toMountId": to_mount_id, "toPath": to_path},
        )
        self._raise(resp)
        result = resp.json()
        print("[+] Copied '{}' -> '{}'".format(from_path, to_path))
        return result

    def rename_file(self, mount_id: str, remote_path: str,
                    new_name: str) -> None:
        """Rename a file or folder in place."""
        resp = self._put(
            "/mounts/{}/files/rename".format(mount_id),
            params={"path": remote_path},
            json={"name": new_name},
        )
        self._raise(resp)
        print("[+] Renamed '{}' -> '{}'".format(remote_path, new_name))

    def delete_file(self, mount_id: str, remote_path: str) -> None:
        """Permanently delete a file or folder."""
        resp = self._delete(
            "/mounts/{}/files/remove".format(mount_id),
            params={"path": remote_path}
        )
        self._raise(resp)
        print("[+] Deleted '{}'".format(remote_path))

    # ------------------------------------------------------------------
    # 11. Shared Links (download links)
    # ------------------------------------------------------------------

    def create_shared_link(self, mount_id: str, remote_path: str) -> Dict:
        """
        Create a public download link for a file or folder.
        Returns the full link object including 'url' and 'shortUrl'.
        """
        resp = self._post(
            "/mounts/{}/links".format(mount_id),
            json={"path": remote_path},
        )
        self._raise(resp)
        link = resp.json()
        print("[+] Shared link created: {}".format(link["url"]))
        print("    Short URL          : {}".format(link.get("shortUrl", "n/a")))
        return link

    def list_shared_links(self, mount_id: str,
                          path_filter: Optional[str] = None) -> List[Dict]:
        """
        List all download links on a mount.
        Optionally filter by exact *path_filter*.
        """
        params = {}
        if path_filter:
            params["path"] = path_filter
        resp = self._get("/mounts/{}/links".format(mount_id), params=params)
        self._raise(resp)
        return resp.json()["links"]

    def delete_shared_link(self, mount_id: str, link_id: str) -> None:
        """Remove a download link by its ID."""
        resp = self._delete("/mounts/{}/links/{}".format(mount_id, link_id))
        self._raise(resp)
        print("[+] Deleted link {}".format(link_id))

    def set_link_password(self, mount_id: str, link_id: str) -> Dict:
        """Set (or reset) the password on a download link. Returns new password."""
        resp = self._put(
            "/mounts/{}/links/{}/password/reset".format(mount_id, link_id)
        )
        self._raise(resp)
        result = resp.json()
        print("[+] Link password set: {}".format(result.get("password")))
        return result

    def remove_link_password(self, mount_id: str, link_id: str) -> Dict:
        """Remove the password from a download link."""
        resp = self._delete(
            "/mounts/{}/links/{}/password".format(mount_id, link_id)
        )
        self._raise(resp)
        print("[+] Link password removed.")
        return resp.json()

    # ------------------------------------------------------------------
    # 12. Receivers (upload links)
    # ------------------------------------------------------------------

    def create_receiver(self, mount_id: str,
                        remote_folder: str) -> Dict:
        """
        Create an upload link (receiver) pointing to *remote_folder*.
        The path must end with a trailing slash.
        Returns the receiver object.
        """
        if not remote_folder.endswith("/"):
            remote_folder += "/"
        resp = self._post(
            "/mounts/{}/receivers".format(mount_id),
            json={"path": remote_folder},
        )
        self._raise(resp)
        receiver = resp.json()
        print("[+] Upload receiver created: {}".format(receiver["url"]))
        return receiver

    def list_receivers(self, mount_id: str) -> List[Dict]:
        """List all upload receivers on a mount."""
        resp = self._get("/mounts/{}/receivers".format(mount_id))
        self._raise(resp)
        return resp.json()["receivers"]


# ---------------------------------------------------------------------------
# Demo / quick-start
# ---------------------------------------------------------------------------

def demo():
    """
    End-to-end demonstration of the client.
    Set DIGI_EMAIL and DIGI_PASSWORD as environment variables, or edit below.
    """
    email    = os.environ.get("DIGI_EMAIL",    "your@email.ro")
    password = os.environ.get("DIGI_PASSWORD", "yourpassword")

    client = DigiStorageClient(email, password)

    # --- Authenticate ---
    client.login()

    # --- User info ---
    info = client.get_user_info()
    print("\nUser: {} {} <{}>".format(
        info["firstName"], info["lastName"], info["email"]))

    # --- List mounts ---
    mounts = client.list_mounts()
    print("\nMounts ({}):".format(len(mounts)))
    for m in mounts:
        print("  - {}  type={}  id={}".format(m["name"], m["type"], m["id"]))

    # Pick the primary (device) mount
    mount_id = client.get_primary_mount_id()

    # --- Browse root ---
    print("\nRoot contents:")
    for f in client.list_files(mount_id, "/"):
        ftype = "DIR " if f["type"] == "dir" else "FILE"
        size  = "{:,} B".format(f["size"]) if f["type"] == "file" else ""
        print("  [{}] {}  {}".format(ftype, f["name"], size))

    # --- Create a demo folder ---
    client.create_folder(mount_id, "/", "DigiStorageDemo")

    # --- Upload a test file ---
    test_file = "demo_upload.txt"
    with open(test_file, "w") as fh:
        fh.write("Hello from the Digi Storage API demo!\n")

    client.upload_file(mount_id, test_file, "/DigiStorageDemo/")
    os.remove(test_file)

    # --- Download it back ---
    client.download_file(
        mount_id, "/DigiStorageDemo/demo_upload.txt",
        local_dir="./digi_downloads"
    )

    # --- Create a shared (public) download link ---
    link = client.create_shared_link(
        mount_id, "/DigiStorageDemo/demo_upload.txt")
    print("\nShare this link: {}".format(link["url"]))
    print("Short URL      : {}".format(link.get("shortUrl", "n/a")))

    # --- Optional: password-protect the link ---
    protected = client.set_link_password(mount_id, link["id"])
    print("Password       : {}".format(protected.get("password")))

    # --- Create an upload receiver (inbox) ---
#    receiver = client.create_receiver(mount_id, "/DigiStorageDemo/Inbox/")
#    print("\nUpload receiver: {}".format(receiver["url"]))

    # --- List all download links ---
    links = client.list_shared_links(mount_id)
    print("\nAll download links ({}):".format(len(links)))
    for lnk in links:
        print("  {}  ->  {}".format(lnk["path"], lnk["url"]))

    # --- Logout ---
    client.logout()
    print("\n[done]")


if __name__ == "__main__":
    demo()