diff --git a/deps.edn b/deps.edn index d796e21e..37ec0295 100644 --- a/deps.edn +++ b/deps.edn @@ -38,6 +38,7 @@ cheshire/cheshire {:mvn/version "5.10.0"} com.cognitect/transit-clj {:mvn/version "1.0.324"} com.cognitect/transit-cljs {:mvn/version "0.8.269"} + org.clojure/core.async {:mvn/version "1.6.681"} } } diff --git a/docs/sync.md b/docs/sync.md new file mode 100644 index 00000000..8e499406 --- /dev/null +++ b/docs/sync.md @@ -0,0 +1,51 @@ +Server assigns id: + +- Changes when client reconnects + +Client assigns id: + +- What to do with client before :catch-up message? +- Where to store send-fn? +- Can track how far each client is. Why? + + +``` +tx :: {:tx-data [...] + :tx-id + :server-idx } +``` + +# Client connects to a server + +``` +SND {:message :catching-up + :patterns [ ...] + :server-idx ?} + +RCV {:message :catched-up + :snapshot + :server-idx } + +or + +RCV {:message :catched-up + :txs [ ...]} +``` + +# Client makes a transaction + +``` +SND {:message :transacting + :server-idx server-idx + :txs [{:tx-data ... + :tx-id ...} ...]} +``` + +# Server broadcasts a transaction + +``` +RCV {:message :transacted + :tx-data ... + :tx-id ... + :server-idx ...} ...]} +``` diff --git a/project.clj b/project.clj index 7d7b3d0c..f6b7118f 100644 --- a/project.clj +++ b/project.clj @@ -86,7 +86,8 @@ :test {:dependencies [[metosin/jsonista "0.3.3"] [cheshire "5.10.0"] [com.cognitect/transit-clj "1.0.324"] - [com.cognitect/transit-cljs "0.8.269"]]} + [com.cognitect/transit-cljs "0.8.269"] + [org.clojure/core.async "1.6.681"]]} :bench {:dependencies [[criterium "0.4.6"] [metosin/jsonista "0.3.3"] [com.clojure-goes-fast/clj-async-profiler "0.5.1"]]} diff --git a/src/datascript/db.cljc b/src/datascript/db.cljc index 5ed8f4f1..69fc4ecf 100644 --- a/src/datascript/db.cljc +++ b/src/datascript/db.cljc @@ -1779,3 +1779,7 @@ :else (raise "Bad entity type at " entity ", expected map or vector" {:error :transact/syntax, :tx-data entity}))))) + +(defn tx-from-datoms [datoms] + (mapv #(vector (if (:added %) :db/add :db/retract) (:e %) (:a %) (:v %)) datoms)) + \ No newline at end of file diff --git a/src/datascript/sync/client.cljc b/src/datascript/sync/client.cljc new file mode 100644 index 00000000..f01981ca --- /dev/null +++ b/src/datascript/sync/client.cljc @@ -0,0 +1,82 @@ +(ns datascript.sync.client + (:require + [datascript.conn :as conn] + [datascript.db :as db] + [datascript.serialize :as serialize] + [datascript.util :as util])) + +(defn client-id [] + (long (* (rand) 0x1FFFFFFFFFFFFF))) + +(def *last-tx-id + (atom 0)) + +(defn new-tx-id [client-id] + [client-id (swap! *last-tx-id inc)]) + +(defn on-tx [conn report] + (when-not (:server? (:tx-meta report)) + (let [{:keys [client-id send-fn server-idx connected?]} @(:atom conn) + tx {:tx-data (db/tx-from-datoms (:tx-data report)) + :tx-id (new-tx-id client-id)}] + (when connected? + (send-fn + {:message :transacting + :server-idx server-idx + :txs [tx]})) + (swap! (:atom conn) update :pending conj tx)))) + +(defn create-conn [patterns send-fn] + (let [res (@#'conn/make-conn + {:db nil + :client-id (client-id) + :server-db nil + :pending util/empty-queue + :server-idx nil + :connected? true + :send-fn send-fn})] + (send-fn {:message :catching-up}) + res)) + +(defn server-message [conn body] + (case (:message body) + :catched-up + (let [{:keys [snapshot server-idx]} body + db (serialize/from-serializable snapshot)] + (conn/reset-conn! conn db {:server? true}) + (swap! (:atom conn) + (fn [atom] + (-> atom + (assoc :server-db db) + (assoc :server-idx server-idx) + (update :listeners assoc :sync #(on-tx conn %)))))) + + :transacted + (let [{:keys [tx-data tx-id server-idx]} body + {server-db :server-db + pending :pending + listeners :listeners} @(:atom conn) + report (conn/with server-db tx-data {:server? true}) + server-db' (:db-after report)] + (swap! (:atom conn) assoc + :server-db server-db' + :server-idx server-idx) + (if (= tx-id (:tx-id (peek pending))) + (swap! (:atom conn) update :pending pop) + (do + (reset! conn (reduce conn/db-with server-db' pending)) + (doseq [[_ callback] listeners] + (callback report)))))) + nil) + +(defn server-disconnected [conn] + (swap! (:atom conn) + #(-> % + (assoc :connected? false) + (update :listeners dissoc :sync)))) + +(defn server-connected [conn] + (swap! (:atom conn) assoc :connected? true) + (let [{:keys [send-fn server-idx]} @(:atom conn)] + (send-fn {:message :catching-up + :server-idx server-idx}))) diff --git a/src/datascript/sync/server.cljc b/src/datascript/sync/server.cljc new file mode 100644 index 00000000..7319c429 --- /dev/null +++ b/src/datascript/sync/server.cljc @@ -0,0 +1,66 @@ +(ns datascript.sync.server + (:require + [datascript.conn :as conn] + [datascript.db :as db] + [datascript.serialize :as serialize])) + +(defn- client [conn channel] + (-> conn :atom deref :clients (get channel))) + +(defn on-tx [conn report] + (let [clients (:clients @(:atom conn)) + msg {:message :transacted + :tx-data (db/tx-from-datoms (:tx-data report)) + :tx-id (:tx-id (:tx-meta report)) + :server-idx (:db/current-tx (:tempids report))}] + (doseq [[channel {:keys [status send-fn pending]}] clients] + (if (= :active status) + (do + (when pending + (doseq [msg pending] + (send-fn channel msg)) + (swap! (:atom conn) update :clients update client dissoc :pending)) + (send-fn channel msg)) + (swap! (:atom conn) update :clients update client update :pending (fnil conj []) msg))))) + +(defn client-connected [conn channel send-fn] + (let [clients' (:clients + (swap! (:atom conn) update :clients assoc channel + {:status :connected + :send-fn send-fn}))] + (when (= 1 (count clients')) + (conn/listen! conn :sync #(on-tx conn %))) + nil)) + +(defn drop-before [txs server-idx] + (vec + (drop-while #(<= (:server-idx %) server-idx) txs))) + +(defn client-message [conn channel body] + (case (:message body) + :catching-up + (let [{:keys [patterns server-idx]} body ;; TODO delta from server-idx + {:keys [send-fn]} (client conn channel) + db @conn + server-idx (:max-tx db)] + (send-fn channel + {:message :catched-up + :snapshot (serialize/serializable db) ;; TODO patterns + :server-idx server-idx}) + (swap! (:atom conn) update :clients update channel + (fn [client] + (-> client + (assoc :status :active) + (update :pending drop-before server-idx))))) + + :transacting + (doseq [{:keys [tx-data tx-id]} (:txs body)] + ;; TODO handle exception here + (conn/transact! conn tx-data {:tx-id tx-id}))) + nil) + +(defn client-disconnected [conn channel] + (let [clients' (:clients (swap! (:atom conn) update :clients dissoc channel))] + (when (= 0 (count clients')) + (conn/unlisten! conn :sync)) + nil)) diff --git a/src/datascript/util.cljc b/src/datascript/util.cljc index 8cf19e0d..d3bc00a5 100644 --- a/src/datascript/util.cljc +++ b/src/datascript/util.cljc @@ -10,6 +10,10 @@ `(when *debug* (println ~@body))) +(def empty-queue + #?(:clj clojure.lang.PersistentQueue/EMPTY + :cljs cljs.core.PersistentQueue.EMPTY)) + (defn- rand-bits [pow] (rand-int (bit-shift-left 1 pow))) diff --git a/test/datascript/test.cljc b/test/datascript/test.cljc index dc712b0e..085884b2 100644 --- a/test/datascript/test.cljc +++ b/test/datascript/test.cljc @@ -36,6 +36,7 @@ datascript.test.query-v3 datascript.test.serialize #?(:clj datascript.test.storage) + #?(:clj datascript.test.sync) datascript.test.transact datascript.test.tuples datascript.test.validation diff --git a/test/datascript/test/core.cljc b/test/datascript/test/core.cljc index 75922f5a..e1fee50d 100644 --- a/test/datascript/test/core.cljc +++ b/test/datascript/test/core.cljc @@ -97,6 +97,14 @@ #?(:clj (transit-read (.getBytes ^String s "UTF-8") :json) :cljs (transit-read s :json))) +#?(:clj + (def lock (Object.))) + +(defn log [& args] + #?(:clj (locking lock + (apply println args)) + :cljs (apply println args))) + ;; Core tests (deftest test-protocols diff --git a/test/datascript/test/sync.clj b/test/datascript/test/sync.clj new file mode 100644 index 00000000..1a3f1196 --- /dev/null +++ b/test/datascript/test/sync.clj @@ -0,0 +1,87 @@ +(ns datascript.test.sync + (:require + [clojure.core.async :as async :refer [>! ! ch [:c1 (freeze %)]))) + c2 (client/create-conn nil #(go (>! ch [:c2 (freeze %)])))] + (server/client-connected server :c1 (fn [_ msg] (go (>! ch1 (freeze msg))))) + (server/client-connected server :c2 (fn [_ msg] (go (>! ch2 (freeze msg))))) + (go-loop [] + (when-some [msg (