DigiStorage pentru Developeri
"""
Digi Storage API - Python Client (Python 3.8+)
=================================================
Covers: authentication, file listing, upload, download,
move/copy/delete, folder creation, and shared link management.
Requirements:
pip install requests
Usage:
client = DigiStorageClient("your@email.ro", "yourpassword")
client.login()
mount_id = client.get_primary_mount_id()
client.upload_file(mount_id, "/path/to/local/file.txt", "/remote/folder/")
client.download_file(mount_id, "/remote/folder/file.txt", "/local/dest/")
link = client.create_shared_link(mount_id, "/remote/folder/file.txt")
print(link["url"])
"""
import os
import requests
from typing import Dict, List, Optional
BASE_URL = "https://storage.rcs-rds.ro"
#API_URL = format(BASE_URL)."/api/v2"
TOKEN_URL = "https://storage.rcs-rds.ro/token"
API_URL = "https://storage.rcs-rds.ro/api/v2.1"
# ---------------------------------------------------------------------------
# Client class
# ---------------------------------------------------------------------------
class DigiStorageClient:
"""Thin wrapper around the Digi Storage REST API v2."""
def __init__(self, email: str, password: str):
self.email = email
self.password = password
self.token = None # type: Optional[str]
self.session = requests.Session()
self.session.headers.update({"Accept": "application/json"})
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _auth_headers(self) -> Dict[str, str]:
if not self.token:
raise RuntimeError("Not authenticated. Call login() first.")
return {"Authorization": 'Token token="{}"'.format(self.token)}
def _get(self, path: str, **kwargs) -> requests.Response:
return self.session.get(
"{}{}".format(API_URL, path),
headers=self._auth_headers(),
**kwargs
)
def _post(self, path: str, **kwargs) -> requests.Response:
return self.session.post(
"{}{}".format(API_URL, path),
headers=self._auth_headers(),
**kwargs
)
def _put(self, path: str, **kwargs) -> requests.Response:
return self.session.put(
"{}{}".format(API_URL, path),
headers=self._auth_headers(),
**kwargs
)
def _delete(self, path: str, **kwargs) -> requests.Response:
return self.session.delete(
"{}{}".format(TOKEN_URL, path),
headers=self._auth_headers(),
**kwargs
)
@staticmethod
def _raise(resp: requests.Response):
"""Raise a readable error on non-2xx responses."""
try:
resp.raise_for_status()
except requests.HTTPError as exc:
raise requests.HTTPError(
"{} — body: {}".format(exc, resp.text[:300]),
response=resp
) from exc
# ------------------------------------------------------------------
# 1. Authentication
# ------------------------------------------------------------------
def login(self) -> str:
"""Authenticate and store the session token. Returns the token."""
resp = self.session.post(
"https://storage.rcs-rds.ro/token",
json={"email": self.email, "password": self.password},
)
self._raise(resp)
self.token = resp.json()["token"]
print("[+] Logged in as {}".format(self.email))
return self.token
def logout(self):
"""Revoke the current session token."""
resp = self._delete("")
self._raise(resp)
print("[+] Logged out.")
self.token = None
# ------------------------------------------------------------------
# 2. User
# ------------------------------------------------------------------
def get_user_info(self) -> Dict:
"""Return basic profile information for the authenticated user."""
resp = self._get("/user")
self._raise(resp)
return resp.json()
# ------------------------------------------------------------------
# 8. Mounts
# ------------------------------------------------------------------
def list_mounts(self, mount_type: Optional[str] = None) -> List[Dict]:
"""
List all mounts (storage entrypoints).
mount_type - optional filter: 'device', 'export', or 'import'.
"""
params = {}
if mount_type:
params["type"] = mount_type
resp = self._get("/mounts", params=params)
self._raise(resp)
return resp.json()["mounts"]
def get_primary_mount_id(self) -> str:
"""
Convenience: return the ID of the first 'device' mount
(i.e. your personal Digi Storage space).
"""
mounts = self.list_mounts(mount_type="device")
if not mounts:
raise RuntimeError("No device mounts found for this account.")
mount = mounts[0]
print("[+] Using mount: {} (id={})".format(mount["name"], mount["id"]))
return mount["id"]
# ------------------------------------------------------------------
# 9. Files
# ------------------------------------------------------------------
def list_files(self, mount_id: str, remote_path: str = "/") -> List[Dict]:
"""List files and folders at *remote_path* on *mount_id*."""
resp = self._get(
"/mounts/{}/files/list".format(mount_id),
params={"path": remote_path}
)
self._raise(resp)
return resp.json()["files"]
def file_info(self, mount_id: str, remote_path: str) -> Dict:
"""Return metadata for a single file or folder."""
resp = self._get(
"/mounts/{}/files/info".format(mount_id),
params={"path": remote_path}
)
self._raise(resp)
return resp.json()
def create_folder(self, mount_id: str, parent_path: str,
folder_name: str) -> None:
"""Create a new folder *folder_name* inside *parent_path*."""
resp = self._post(
"/mounts/{}/files/folder".format(mount_id),
params={"path": parent_path},
json={"name": folder_name},
)
self._raise(resp)
print("[+] Folder created: {}/{}".format(
parent_path.rstrip("/"), folder_name))
def upload_file(self, mount_id: str, local_path: str,
remote_folder: str = "/") -> Dict:
"""
Upload a local file to *remote_folder* on the given mount.
Returns the JSON response from the blob endpoint.
"""
if not os.path.isfile(local_path):
raise FileNotFoundError(
"Local file not found: {}".format(local_path))
# Step 1 - obtain a one-time upload URL
resp = self._get(
"/mounts/{}/files/upload".format(mount_id),
params={"path": remote_folder}
)
self._raise(resp)
upload_url = resp.json()["link"]
# Step 2 - POST the file as multipart/form-data (no auth header needed)
filename = os.path.basename(local_path)
with open(local_path, "rb") as fh:
upload_resp = self.session.post(
upload_url,
files={"file": (filename, fh)},
headers={}, # no Auth header for blob endpoint
)
self._raise(upload_resp)
result = upload_resp.json()
print("[+] Uploaded '{}' -> {}".format(filename, remote_folder))
return result
def download_file(self, mount_id: str, remote_path: str,
local_dir: str = ".") -> str:
"""
Download a file from *remote_path* and save it to *local_dir*.
Returns the local destination path.
"""
# Step 1 - get the direct download link
resp = self._get(
"/mounts/{}/files/download".format(mount_id),
params={"path": remote_path}
)
self._raise(resp)
download_url = resp.json()["link"]
# Step 2 - fetch the actual bytes (no auth needed)
dl_resp = self.session.get(download_url, stream=True)
self._raise(dl_resp)
filename = os.path.basename(remote_path)
local_path = os.path.join(local_dir, filename)
os.makedirs(local_dir, exist_ok=True)
with open(local_path, "wb") as fh:
for chunk in dl_resp.iter_content(chunk_size=8192):
fh.write(chunk)
print("[+] Downloaded '{}' -> {}".format(remote_path, local_path))
return local_path
def move_file(self, mount_id: str, from_path: str,
to_mount_id: str, to_path: str) -> Dict:
"""Move or rename a file/folder."""
resp = self._put(
"/mounts/{}/files/move".format(mount_id),
params={"path": from_path},
json={"toMountId": to_mount_id, "toPath": to_path},
)
self._raise(resp)
result = resp.json()
print("[+] Moved '{}' -> '{}'".format(from_path, to_path))
return result
def copy_file(self, mount_id: str, from_path: str,
to_mount_id: str, to_path: str) -> Dict:
"""Copy a file/folder to a new location."""
resp = self._put(
"/mounts/{}/files/copy".format(mount_id),
params={"path": from_path},
json={"toMountId": to_mount_id, "toPath": to_path},
)
self._raise(resp)
result = resp.json()
print("[+] Copied '{}' -> '{}'".format(from_path, to_path))
return result
def rename_file(self, mount_id: str, remote_path: str,
new_name: str) -> None:
"""Rename a file or folder in place."""
resp = self._put(
"/mounts/{}/files/rename".format(mount_id),
params={"path": remote_path},
json={"name": new_name},
)
self._raise(resp)
print("[+] Renamed '{}' -> '{}'".format(remote_path, new_name))
def delete_file(self, mount_id: str, remote_path: str) -> None:
"""Permanently delete a file or folder."""
resp = self._delete(
"/mounts/{}/files/remove".format(mount_id),
params={"path": remote_path}
)
self._raise(resp)
print("[+] Deleted '{}'".format(remote_path))
# ------------------------------------------------------------------
# 11. Shared Links (download links)
# ------------------------------------------------------------------
def create_shared_link(self, mount_id: str, remote_path: str) -> Dict:
"""
Create a public download link for a file or folder.
Returns the full link object including 'url' and 'shortUrl'.
"""
resp = self._post(
"/mounts/{}/links".format(mount_id),
json={"path": remote_path},
)
self._raise(resp)
link = resp.json()
print("[+] Shared link created: {}".format(link["url"]))
print(" Short URL : {}".format(link.get("shortUrl", "n/a")))
return link
def list_shared_links(self, mount_id: str,
path_filter: Optional[str] = None) -> List[Dict]:
"""
List all download links on a mount.
Optionally filter by exact *path_filter*.
"""
params = {}
if path_filter:
params["path"] = path_filter
resp = self._get("/mounts/{}/links".format(mount_id), params=params)
self._raise(resp)
return resp.json()["links"]
def delete_shared_link(self, mount_id: str, link_id: str) -> None:
"""Remove a download link by its ID."""
resp = self._delete("/mounts/{}/links/{}".format(mount_id, link_id))
self._raise(resp)
print("[+] Deleted link {}".format(link_id))
def set_link_password(self, mount_id: str, link_id: str) -> Dict:
"""Set (or reset) the password on a download link. Returns new password."""
resp = self._put(
"/mounts/{}/links/{}/password/reset".format(mount_id, link_id)
)
self._raise(resp)
result = resp.json()
print("[+] Link password set: {}".format(result.get("password")))
return result
def remove_link_password(self, mount_id: str, link_id: str) -> Dict:
"""Remove the password from a download link."""
resp = self._delete(
"/mounts/{}/links/{}/password".format(mount_id, link_id)
)
self._raise(resp)
print("[+] Link password removed.")
return resp.json()
# ------------------------------------------------------------------
# 12. Receivers (upload links)
# ------------------------------------------------------------------
def create_receiver(self, mount_id: str,
remote_folder: str) -> Dict:
"""
Create an upload link (receiver) pointing to *remote_folder*.
The path must end with a trailing slash.
Returns the receiver object.
"""
if not remote_folder.endswith("/"):
remote_folder += "/"
resp = self._post(
"/mounts/{}/receivers".format(mount_id),
json={"path": remote_folder},
)
self._raise(resp)
receiver = resp.json()
print("[+] Upload receiver created: {}".format(receiver["url"]))
return receiver
def list_receivers(self, mount_id: str) -> List[Dict]:
"""List all upload receivers on a mount."""
resp = self._get("/mounts/{}/receivers".format(mount_id))
self._raise(resp)
return resp.json()["receivers"]
# ---------------------------------------------------------------------------
# Demo / quick-start
# ---------------------------------------------------------------------------
def demo():
"""
End-to-end demonstration of the client.
Set DIGI_EMAIL and DIGI_PASSWORD as environment variables, or edit below.
"""
email = os.environ.get("DIGI_EMAIL", "your@email.ro")
password = os.environ.get("DIGI_PASSWORD", "yourpassword")
client = DigiStorageClient(email, password)
# --- Authenticate ---
client.login()
# --- User info ---
info = client.get_user_info()
print("\nUser: {} {} <{}>".format(
info["firstName"], info["lastName"], info["email"]))
# --- List mounts ---
mounts = client.list_mounts()
print("\nMounts ({}):".format(len(mounts)))
for m in mounts:
print(" - {} type={} id={}".format(m["name"], m["type"], m["id"]))
# Pick the primary (device) mount
mount_id = client.get_primary_mount_id()
# --- Browse root ---
print("\nRoot contents:")
for f in client.list_files(mount_id, "/"):
ftype = "DIR " if f["type"] == "dir" else "FILE"
size = "{:,} B".format(f["size"]) if f["type"] == "file" else ""
print(" [{}] {} {}".format(ftype, f["name"], size))
# --- Create a demo folder ---
client.create_folder(mount_id, "/", "DigiStorageDemo")
# --- Upload a test file ---
test_file = "demo_upload.txt"
with open(test_file, "w") as fh:
fh.write("Hello from the Digi Storage API demo!\n")
client.upload_file(mount_id, test_file, "/DigiStorageDemo/")
os.remove(test_file)
# --- Download it back ---
client.download_file(
mount_id, "/DigiStorageDemo/demo_upload.txt",
local_dir="./digi_downloads"
)
# --- Create a shared (public) download link ---
link = client.create_shared_link(
mount_id, "/DigiStorageDemo/demo_upload.txt")
print("\nShare this link: {}".format(link["url"]))
print("Short URL : {}".format(link.get("shortUrl", "n/a")))
# --- Optional: password-protect the link ---
protected = client.set_link_password(mount_id, link["id"])
print("Password : {}".format(protected.get("password")))
# --- Create an upload receiver (inbox) ---
# receiver = client.create_receiver(mount_id, "/DigiStorageDemo/Inbox/")
# print("\nUpload receiver: {}".format(receiver["url"]))
# --- List all download links ---
links = client.list_shared_links(mount_id)
print("\nAll download links ({}):".format(len(links)))
for lnk in links:
print(" {} -> {}".format(lnk["path"], lnk["url"]))
# --- Logout ---
client.logout()
print("\n[done]")
if __name__ == "__main__":
demo()