mpenet.casyn

https://github.com/mpenet/casyn.git

git clone 'https://github.com/mpenet/casyn.git'

(ql:quickload :mpenet.casyn)
17

casyn

Clojure client for Cassandra using Thrift AsyncClient.

The entire Cassandra Thrift Api (1.2.x) is supported, this includes basic CQL support (for a pure CQL3 client try alia). See commands API for details.

Pooling is using Apache commons pools, but it is open to other implementations from clojure Protocols/multimethods, the same is true for almost every part of the library (cluster, balancer, codecs, failover).

Contributions, suggestions and bug reports are welcome.

Installation

Casyn uses Leinigen 2, but it is compatible with 1.x

Add the following dependency on your project.clj:

[cc.qbits/casyn "1.3.2"]

Note: It runs on Clojure 1.4+ and is being tested with Cassandra 1.2.3 (it should work fine with 1.x.x versions).

Usage

Basics

(use 'qbits.casyn)

(def cl (make-cluster "localhost" 9160 "Keyspace1"))

;; We now create a client function for our future requests
;; This will manage the node selection, connection pooling, and client
;; workflow for every command. It also allows you to set failover and
;; timeout at this level or inherit these from cluster settings.

(def c (client-fn cl))

More details about cluster configuration and client-fn

API calls return Lamina result-channels. This is an example of this. From there you have multiple choices you can just have the call block and wait for a result/error by dereferencing it.

@(c get-row "colFamily1" "1")

or since we want to play asynchronously register success and error callbacks (both are optional)

(require '[lamina.core :as l])

(c get-row "colFamily1" "1"
   :success #(println "It worked, row:" %)
   :error #(println "It failed, error:" %))

but it is often better to use a pipeline to compose async/sync operations. Here we are writing a new row, then reading the entire row, with an imaginary step in between that could be synchronous or asynchronous (returning a result-channel) A pipeline also returns a result-channel and can be nested with other pipelines, making async workflow and error handling easier to deal with.

@(l/run-pipeline
  (c insert-column "colFamily1" "1" "n0" "value0")
  {:error-handler (fn [e] (println "snap, something went wrong: " e))}
  (some-other-async-operation)
  (fn [result] (c get-row "colFamily1" (:id result))))

user> ({:name #<byte[] [B@7cc09980>
        :value #<byte[] [B@489de27c>
        :ttl 0
        :timestamp 1332535710069564})

Lamina offers a lot of possibilities.

Encoding/decoding - Schemas

Cassandra/Thrift column name/values are returned as bytes, but you can supply a schema for decoding. Encoding of clojure data is automatic. Encoding/decoding is open and extendable, see codecs.clj. This also works with CQL query results.

A simple example with a schema:

(defschema test-schema
  :row :utf-8 ;; the row key will be decoded as utf-8
  :columns {:default [:keyword :utf-8]
            ;; :keyword type will be the default decoder for the column name
            ;; and :utf-8 will be the decoder type of the column value

            ;; When a column with the age name is encountered it will
            ;; overwride the defaults for decoding, the name the column is the
            ;; key and the value its type. In this example the key is a keyword,
            ;; but this depends on the column :default you set
            ;; earlier, it could be of any type.
            :exceptions {:age :long
                         :created :date
                         :code :clj}})

@(c put "colFamily1" "7"
    {:age 35
     :name "Max"
     :created (java.util.Date.)
     :code {:foo [{:bar "baz"}]}})

@(c get-row "colFamily1" "7" :schema test-schema)

user> ({:name :age, :value 35, :ttl 0, :timestamp 1332536503948650}
       {:name :name, :value "Max", :ttl 0, :timestamp 1332536503948652})
       {:name :created, :value #inst "2012-08-22T22:34:41.079-00:00", :ttl 0, :timestamp 1332536503948651
       {:name :code, :value {:foo [{:bar "baz"}]}, :ttl 0, :timestamp 1332536503948652}}

A collection of columns can be turned into a regular map just pass :as :map.

@(c get-row "colFamily1" "7" :schema test-schema :as :map)

user> {:age 35
       :name "Max"
       :created #inst "2012-08-22T22:34:41.079-00:00"
       :code {:foo [{:bar "baz"}]}}

Supported types

:utf-8 :ascii :long :float :double :int big-int :boolean :keyword :bytes :date :uuid :time-uuid :clj

Note about ASCII: Clojure Strings are by default encoded as utf-8, ASCII strings must be passed as Bytes ex: (.getBytes "meh" "US-ASCII"), specifying :ascii on the schema allow their automatic decoding though. If you want the read/write behavior to be symetrical just use :bytes as schema type and handle this on your side.

Composite types

Composite types are also supported and use the same type definitions (they can be used as keys, names, values), instead of specifying a single type value in the schema use a vector of types for the actual values. Here the column name will be a composite or 3 different types.

(defschema test-schema
  :row :utf-8
  :columns {:default [[:utf-8 :long :double] :utf-8]}})

To create composite values just use the composite function or #casyn/composite reader literal, it will mark the collection as composite and encode it accordingly when you execute the query.

(c insert-column "colFamily1" "1"  (composite ["meh" 1001  3.14]) "value0"))

Querying using composite values is also supported, for a brief overview see tests.clj or the documentation.

TimeUUIDs

TimeUUIDs are supported from tardis, you will need to use its API to create Type 1 UUIDs, from there encoding/decoding is automatic.

Joda Time

Joda time support is available, you need to require/use qbits.casyn.codecs.joda-time, and use :date-time in your schemas.

Clojure serialization

As shown in the previous example you can also store clojure data direclty (it is the fallback of the encoding protocol), this will be done via Nippy, you will just need to indicate :clj as decoding type in the schema.

Extending type support

The Joda time support is a good example of how it is achieved.

You need to do two things, extend the encoding protocol, and add a defmethod definition for decoding with the keyword you will use in your schemas.

(ns qbits.casyn.codecs.joda-time
  "Encoding/decoding of org.joda.time.DateTime instances"
  (:require
   [qbits.casyn.codecs :refer [ByteBufferEncodable
                         bytes->clojure
                         clojure->byte-buffer]]
   [clj-time.coerce :as ct-c]))

(extend-protocol ByteBufferEncodable
  org.joda.time.DateTime
  (clojure->byte-buffer [dt]
    (clojure->byte-buffer (ct-c/to-long dt))))

(defmethod bytes->clojure :date-time [_ b]
  (ct-c/from-long (bytes->clojure :long b)))

Consistency

Consistency is done per query, and is supported on all commands.

(c insert-column "cf" "1" "foo" "bar" :consistency :all)

Or using the following macros:

with-consistency can be used if you prefer that to explicit arguments.

clojure (with-consistency :all @(c get-row "colFamily1" "1") @(c get-row "colFamily1" "2"))

The possible values are: :all :any :each-quorum :local-quorum :one :quorum :three :two

Refer to Cassandra API doc for details.

Hayt: CQL3 query DSL

Casyn also comes with a CQL3 query DSL that looks very much like Korma or ClojureQL. More details about Hayt.

Documentation

See the API documentation or tests for more details.

Some useful pages:

Note: (almost) the entire lib is aliased from qbits.casyn so that you can have everything you need with a single require, as seen on the examples.

Changelog

See CHANGELOG.md

YourKit

Casyn is being developed with the help of YourKit profiler.

YourKit is kindly supporting open source projects with its full-featured Java Profiler. YourKit, LLC is the creator of innovative and intelligent tools for profiling Java and .NET applications. Take a look at YourKit's leading software products: YourKit Java Profiler YourKit .NET Profiler.

License

Copyright (C) 2012 Max Penet

Distributed under the Eclipse Public License, the same as Clojure.