ymilky.franzy

https://github.com/ymilky/franzy.git

git clone 'https://github.com/ymilky/franzy.git'

(ql:quickload :ymilky.franzy)
118

Franzy

Franzy is a suite of Clojure libraries for Apache Kafka. It includes libraries for Kafka consumers, producers, partitioners, callbacks, serializers, and deserializers. Additionally, there are libraries for administration, testing, mocking, running embedded Kafka brokers and zookeeper clusters, and more.

The main goal of Franzy is to make life easier for working with Kafka from Clojure. Franzy provides a foundation for building higher-level abstractions for whatever your needs might be.

Platform

Franzy breaks up its functionality into several different libraries to minimize dependency issues, especially on differing Kafka dependencies (ex: Server vs. Consumer/Producer).

| Name | Type | Description | Major Dependencies | |————————————————————–|——————–|—————————————————————————————————————————–|——————————————————| | Franzy | client | This library - core client-oriented functionality, i.e. consumer, producer, schemas, more. | Franzy-Common, Kafka client | | Franzy Admin | client | Administer Kafka with Clojure, get Clojure data in/out, create topics, add partitions, list brokers, etc. | Franzy-Common, Kafka server (Scala/Java) | | Franzy Common | lib | Common functionality for any Franzy development, and useful for Kafka in general | Clojure, Schema | | Franzy Nippy | de/serializer | Nippy Serializer/Deserializer for Kafka. | Nippy | | Franzy Transit | de/serializer | Transit Serializer/Deserializer for Kafka. | Transit | | Franzy JSON | de/serializer | JSON/Smile Serializer/Deserializer for Kafka. | Cheshire | | Franzy Fressian | de/serializer | Fressian Serializer/Deserializer for Kafka. | Fressian | | Franzy Avro | de/serializer | AVRO Serializer/Deserializer for Kafka. | Abracad | | Franzy Embedded | embedded broker | Full featured embedded Kafka server for testing/dev, with multiple implementations including concrete types and components. | Kafka server | | Franzy Mocks | testing | Test your consumers and producers without a running Kafka cluster, and more in the future. | Franzy, Kafka client | | Franzy Examples | examples | Growing project of examples using all the above, to learn at your leisure. | All | | Travel Zoo | embedded Zookeeper | Embedded Zookeeper servers and clusters for testing and development, with concrete type and component versions available. | Curator Test |

Features

Why?

In addition to raw features, some reasons you may want to use Franzy:

Requirements

Requirements may vary slightly depending on your intended usage.

A good way to get started with Kafka is to use Docker and/or Vagrant. I recommend using a Docker compose stack with Kafka and Zookeeper that lets you scale up/down to test. You can also use the embedded Kafka and Zookeeper libraries listed above and discussed in the Testing/Dev section.

Installation

These libraries have had a few weeks of peer review and no issues thus far. I will be releasing some new versions shortly as I have time in the coming weeks. Thus far, there are no breaking API changes but I am open to any suggested changes or submissions. Please let me know and be ready for an upgrade soon. Thanks for your support.

[ymilky/franzy "0.0.1"]

Clojars Project

Docs

Usage

The best way to learn is Franzy Examples and viewing the API docs, source, etc.

Below are a few naive examples to get you started.

Serialization

You'll need to pick a format in/out of Kafka.

I recommend you use Franzy-Nippy, but think carefully about your use-case. If you're just getting started, the built-in EDN Serializer is a good choice to keep things simple. Of course, all the built-in serializers in Kafka are accessible as well.

For the built-in serializers/deserializers, simply do something like this:

(ns my.ns
  (:require [franzy.serialization.deserializers :as deserializers]
            [franzy.serialization.serializers :as serializers]))

For the add-ons you'll have to reference them as separate dependencies obviously. They follow a pattern like this, replacing nippy with the serializer/deserializer name:

(ns my.ns
  (:require [franzy.serialization.nippy.deserializers :as deserializers]
            [franzy.serialization.nippy.serializers :as serializers]))

See Serializers for a discussion.

Producers

For some general information about producers, check the source for many comments, read the browsable api, and skim this short, but growing producer guide.

There are many ways to use and create producers. Below are a few naive examples of creating producers.

;;Creating a producer with a few simple config values and options to show what can be done, your usage will vary

(let [pc {:bootstrap.servers ["127.0.0.1:9092"]
          :retries           1
          :batch.size        16384
          :linger.ms         1
          :buffer.memory     33554432}
      ;;normally, just inject these direct and be aware some serializers may need to be closed,
      ;; adding to binding here to make this clear
      
      ;;Serializes producer record keys as strings
      key-serializer (serializers/string-serializer)
      ;;Serializes producer record values as strings
      value-serializer (serializers/string-serializer)
      ]
  (with-open [p (producer/make-producer pc key-serializer value-serializer)]
    (partitions-for p "test")))

;;=>
[{:topic "test",
  :partition 0,
  :leader {:id 1001, :host "127.0.0.1", :port 9092},
  :replicas [{:id 1001, :host "127.0.0.1", :port 9092}],
  :in-sync-replicas [{:id 1001, :host "127.0.0.1", :port 9092}]}]

Synchronous and asynchronous production, using a different producer arity:

(let [;;Use a vector if you wish for multiple servers in your cluster
      pc {:bootstrap.servers ["cliffs-of-insanity.guilder:9092" "fire-swamp.guilder:9092"]}
        ;;Serializes producer record keys that may be keywords
        key-serializer (serializers/keyword-serializer)
        ;;Serializes producer record values as EDN, built-in
        value-serializer (serializers/edn-serializer)
        ;;optionally create some options, even just use the defaults explicitly
        ;;for those that don't need anything fancy...
        options (pd/make-default-producer-options)
        topic "land-wars-in-asia"
        partition 0]
    (with-open [p (producer/make-producer pc key-serializer value-serializer options)]
      (let [send-fut (send-async! p topic partition :inconceivable {:things-in-fashion
                                                                    [:masks :giants :kerry-calling-saul]} options)
            record-metadata (send-sync! p "land-wars-in-asia" 0 :conceivable
                                        {:deadly-poisons [:iocaine-powder :ska-music :vegan-cheese]}
                                        options)
            ;;we can also use records to produce, wrapping our per producer record value (data) as usual
            record-metadata-records (send-sync! p (pt/->ProducerRecord topic partition :vizzini
                                                                       {:quotes ["the battle of wits has begun!"
                                                                                 "finish him, your way!" ]})
                                                options)]
        (println "Sync send results:" record-metadata)
        (println "Sync send results w/record:" record-metadata-records)
        (println "Async send results:" @send-fut))))

Consumer Overview

In order to consume data from Kafka, you must have one or more valid partitions per topic assigned. There are 2-ways to get a partition assignment from Kafka - manually and automatically.

The manual-assignment case is well suited to users who are tracking offsets manually or want to override and commit offset positions. Automatic assignment is performed via subscription and best used when storing offsets in Kafka itself.

The process to consume data from Kafka follows a pattern something like this:

It is vitally important that you understand the implications of threading, polling, partition assignments, and offsets. This is documented in the official Kafka docs, go read it, now, and the consumer Java API docs too.

This is perhaps an over-simplification as there are a few other nuances.

For the impatient among you, the major differences between the “manual” consumer and “subscription” or “automatic” consumer are generally offset management and partition assignment.

Franzy lets you choose, and there's nothing stopping you from manually assigning offsets. Likewise, there's nothing stopping you from committing offsets to Kafka itself manually (rather than Zookeeper, Redis, Aerospike, Cassandra, etc.).

You will find most of what works for the manual and subscription consumers is the same. The details are mainly in which protocols you use. If you want to force one paradigm over another, simply don't call or require the protocols of the other.

It is extremely important to note that consuming via subscriptions and using manual assignments are mutually exclusive. If you attempt to do so at the same time, your code will fail. Although Kafka itself will protect you from any adverse effects of this behavior by throwing an exception, it is not guaranteed that this behavior will remain in future versions as the Kafka API changes.

The examples given here do not create threads for the sake of simplicity. I leave this as an exercise to you as your use-cases will determine your threading model. My colleagues and I prefer to use core.async which makes it simple to transduce results directly on to a channel, manage thread lifecycles, etc. We've also used core.async with Manifold to great success. Understand that when you poll, work with offsets, etc., many of these are blocking operations.

Manual Consumer

Below, we create a manual consumer and demo a bit of the important assignment capabilities you might want to use when working with a manual consumer:

  (let [cc {:bootstrap.servers ["127.0.0.1:9092"]
            :group.id          "hungry-eels"
            :auto.offset.reset :earliest}
        ;;notice now we are using keywords, to ensure things go as we planned when serializing
        key-deserializer (deserializers/keyword-deserializer)
        ;;notice now we are using an EDN deserializer to ensure we get back the data correctly
        value-deserializer (deserializers/edn-deserializer)
        options (cd/make-default-consumer-options)
        topic "land-wars-in-asia"
        ;;notice, we are setting the topics and partitions to what we produced to earlier...
        topic-partitions [{:topic topic :partition 0}]]
    (with-open [c (consumer/make-consumer cc key-deserializer value-deserializer options)]
      ;;first, lets get some information about the currently available topic partitions...
      ;;we will see a list of topics, along with partition info for each one
      (println "Topic Partition Info per Topic:" (list-topics c))
      ;;maybe you just want an eager list of topics, that's it....a simple solution with many possible solutions
      (println "An inefficient vector of topics:" (->> (list-topics c)
                                                       ;;or (keys), but here we want to stringify our keys a bit
                                                       (into [] (map (fn [[k _]] (name k))))))
      ;;something more specific in scope
      (println "Topic Partitions for our topic:" (partitions-for c topic))
      ;;now let us manually assign a partition
      ;;if you really wanted some dynamic behavior, you could use some of the results above from list-topics
      (assign-partitions! c topic-partitions)
      ;;list the assigned partitions - shocking revelations follow:
      (println "Assigned Partitions:" (assigned-partitions c))
      ;;now lets say we don't like to be labeled, and thus, we don't want any more assignments
      (println "Clearing partition assignments....")
      (clear-partition-assignments! c)
      (println "After clearing all partition assignments, we have exactly this many assignments (correlates to wall-street accountability):"
               (assigned-partitions c))))

Subscription-based Consumer

Below, we create a subscription-based consumer that auto-commits its offsets to Kafka. A point of interest that applies both to the subscription-based consumer and the manual consumer is working with consumer records.

(let [cc {:bootstrap.servers       ["127.0.0.1:9092"]
          :group.id                "submissive-blonde-aussies"
          ;;jump as early as we can, note this isn't necessarily 0
          :auto.offset.reset       :earliest
          ;;here we turn on committing offsets to Kafka itself, every 1000 ms
          :enable.auto.commit      true
          :auto.commit.interval.ms 1000}
      key-deserializer (deserializers/keyword-deserializer)
      value-deserializer (deserializers/edn-deserializer)
      topic "land-wars-in-asia"
      ;;Here we are demonstrating the use of a consumer rebalance listener. Normally you'd use this with a manual consumer to deal with offset management.
      ;;As more consumers join the consumer group, this callback should get fired among other reasons.
      ;;To implement a manual consumer without this function is folly, unless you care about losing data, and probably your job.
      ;;One could argue though that most data is not as valuable as we are told. I heard this in a dream once or in intro to Philosophy.
      rebalance-listener (consumer-rebalance-listener (fn [topic-partitions]
                                                        (println "topic partitions assigned:" topic-partitions))
                                                      (fn [topic-partitions]
                                                        (println "topic partitions revoked:" topic-partitions)))
      ;;We create custom producer options and set out listener callback like so.
      ;;Now we can avoid passing this callback every call that requires it, if we so desire
      ;;Avoiding the extra cost of creating and garbage collecting a listener is a best practice
      options (cd/make-default-consumer-options {:rebalance-listener-callback rebalance-listener})]
  (with-open [c (consumer/make-consumer cc key-deserializer value-deserializer options)]
    ;;Note! - The subscription will read your comitted offsets to position the consumer accordingly
    ;;If you see no data, try changing the consumer group temporarily
    ;;If still no, have a look inside Kafka itself, perhaps with franzy-admin!
    ;;Alternatively, you can setup another threat that will produce to your topic while you consume, and all should be well
    (subscribe-to-partitions! c [topic])
    ;;Let's see what we subscribed to, we don't need Cumberbatch to investigate here...
    (println "Partitions subscribed to:" (partition-subscriptions c))
    ;;now we poll and see if there's any fun stuff for us
    (let [cr (poll! c)
          ;;a naive transducer, written the long way
          filter-xf (filter (fn [cr] (= (:key cr) :inconceivable)))
          ;;a naive transducer for viewing the values, again long way
          value-xf (map (fn [cr] (:value cr)))
          ;;more misguided transducers
          inconceivable-transduction (comp filter-xf value-xf)]

      (println "Record count:" (record-count cr))
      (println "Records by topic:" (records-by-topic cr topic))
      ;;;The source data is a seq, be careful!
      (println "Records from a topic that doesn't exist:" (records-by-topic cr "no-one-of-consequence"))
      (println "Records by topic partition:" (records-by-topic-partition cr topic 0))
      ;;;The source data is a list, so no worries here....
      (println "Records by a topic partition that doesn't exist:" (records-by-topic-partition cr "no-one-of-consequence" 99))
      (println "Topic Partitions in the result set:" (record-partitions cr))
      (clojure.pprint/pprint (into [] inconceivable-transduction cr))
      ;(println "Now just the values of all distinct records:")
      (println "Put all the records into a vector (calls IReduceInit):" (into [] cr))
      ;;wow, that was tiring, maybe now we don't want to listen anymore to this topic and take a break, maybe subscribe
      ;;to something else next poll....
      (clear-subscriptions! c)
      (println "After clearing subscriptions, a stunning development! We are now subscribed to the following partitions:"
               (partition-subscriptions c)))))

Working with Consumer Results

You may have noticed in earlier examples we were binding the result set and then doing operations like take, into, etc. on it. That's because Kafka returns richer results than just a map, though you can simply skip the binding and use a map if you like.

Consumers get results by polling Kafka until a timeout, then repeating over-and-over. The consumer will get what it asks for based on the position of the consumer and other consumers influencing this within its consumer group. A poll will return “consumer records” which is an object containing 0 or more instances of “consumer record” objects. Franzy uses records to represent these for speed, memory footprint, and ease of use.

If you don't understand how consumers with Kafka work, you must read more in the official Kafka documentation. This is crucial for any Kafka client library to be useful and not seem “broken” to you.

A common problem that new Kafka users have is that they do not understand the consumption model. Many new users assume the server or the client library must be broken when no results are returned. Take a moment, ensure you understand, read through franzy-examples, and try some examples interactively with your cluster.

The following capabilities are available to you when working with consumer records:

Offset Management

Franzy supports both automatically committing offsets to Kafka and manually managing offsets yourself.

Callbacks for offset commits, consumer rebalance events, and more are provided to help you with this process depending on your needs. If you are managing your own offsets, please use a highly available datastore that is reasonably fast. You may trade reliability for speed if you don't care about losing data.

The following code demonstrates some offset management operations and gotchas with Kafkas that newcomers often struggle with:

(let [cc {:bootstrap.servers ["127.0.0.1:9092"]
          :group.id          "hungry-eels"
          :auto.offset.reset :earliest}
      key-deserializer (deserializers/keyword-deserializer)
      value-deserializer (deserializers/edn-deserializer)
      options (cd/make-default-consumer-options)
      topic "land-wars-in-asia"
      first-topic-partition {:topic topic :partition 0}
      second-topic-partition {:topic topic :partition 1}
      topic-partitions [first-topic-partition second-topic-partition]]
  (with-open [c (consumer/make-consumer cc key-deserializer value-deserializer options)]
    ;;first we'll make sure we can assign some partitions. We could also subscribe instead, but for examples, this is easier.
    (assign-partitions! c topic-partitions)
    (seek-to-beginning-offset! c topic-partitions)
    ;;let's peek at what the next offset is.....it should be 0 if we're at the beginning
    (println "Next offset:" (next-offset c first-topic-partition))
    ;;now maybe we want to save some metadata about the beginning offset....
    ;;Notice, we're sending a map with the keys a topic partition map as the key, and the value as an offset metadata map
    (commit-offsets-sync! c {first-topic-partition {:offset 0, :metadata "In the beginning.....that was a long time ago."}})
    ;;Now let's have a peek at what we committed. If you've done this before, there might be other data obviously
    (println "Committed offsets so far:" (committed-offsets c first-topic-partition))
    ;;Now let's commit the next offset (there should be one if you produced data already), but this time async
    (commit-offsets-async! c {first-topic-partition {:offset 1 :metadata "Those who count from one, are but two."}})
    ;;Another peek at the results, but this might surprise you if your thinking cap is at the cleaners
    (println "Committed offsets after first async call:" (committed-offsets c first-topic-partition))
    ;;The problem here is you passed the offsets as the options map! Don't do it.
    ;; OK, if not then what about other arities?
    (commit-offsets-async! c {first-topic-partition {:offset 1 :metadata "Those who count from one, are but two."}} nil)
    (println "Committed offsets after proper async call:" (committed-offsets c first-topic-partition))
    ;;Nope, still no new data, but what about doing it sync
    (commit-offsets-sync! c {first-topic-partition {:offset 1 :metadata "Those who count from one, are but two."}})
    (println "Committed offsets after 2nd sync call:" (committed-offsets c first-topic-partition))
    ;;OK, great, doing it sync worked, but why?
    ;;Let's create some callbacks so we have a better idea what is going on
    ;;We could use these to do all kinds of fun stuff, like store this metadata in our own shiny database
    (let [occ (offset-commit-callback (fn [offset-metadata]
                                        (println "By the wind shalt be, commit succeeded:" offset-metadata))
                                      (fn [e]
                                        (println "Offsets failed to commit, just like you:" e)))]
      ;;notice the different arity and the fact we pass our callback.
      ;; We could have also just set this in the consumer options, in which case, there would be no need to use this arity
      ;; Unless the callback changed per-call, in which case, someone somewhere has read your code, then engaged the grumble-drive.
      ;;beware of committing async offsets in a separate thread from the poller
      (commit-offsets-async! c {first-topic-partition {:offset 2 :metadata "A Nancy to a Tanya"}} {:offset-commit-callback occ})
      (println "Committed offsets after async callback version:" (committed-offsets c first-topic-partition))
      ;;ok, why are there still no offsets?
      ;;let's try to follow the Franzy documentation! READ IT!
      ;;first, let's poll from offset 2, so we'll need to seek to it
      (seek-to-offset! c first-topic-partition 2)
      ;;and to poll, results are not important as long as we got at least 1 - you did populate the data, didn't you?
      (poll! c)
      (commit-offsets-async! c {first-topic-partition {:offset 2 :metadata "A Nancy to a Tanya"}} {:offset-commit-callback occ})
      (println "Committed offsets after listening to the doc about polling with async commits:" (committed-offsets c first-topic-partition))
      ;;all is well, that was certainly traumatic.....
      )))

Metrics

Producing and consuming from Kafka is not an exact science. Fortunately, Kafka provides a metrics API that you can use in addition to any self-collected metrics to help determine when to use back-pressure, spawn threads, kill threads, fire up new machines, etc.

The code for a producer and consumer uses the same protocol. The consumer case is demonstrated below.

  (let [cc {:bootstrap.servers ["127.0.0.1:9092"]
            :group.id          "mawage"}
        key-deserializer (deserializers/keyword-deserializer)
        value-deserializer (deserializers/edn-deserializer)
        options (cd/make-default-consumer-options)]
    (with-open [c (consumer/make-consumer cc key-deserializer value-deserializer options)]
      ;;Now let's say we want to know something about how consuming is going. Perhaps we are too greedy.
      ;;We can get a plethora of metrics, log them, exert back-pressure on the producer if needed, eject, etc.
      ;;All of this, by parsing this wonderful thing below. JMXers, rejoice.
      ;; If there is more of a demand, we can add more transducers, helpers, etc. for metrics
      ;;WARNING - prepare your REPL for a feast. You won't receive any real values unless you've kept the consumer consuming....
      (metrics c)))

;;=> example output....truncated
{:name "io-ratio",
  :description "The fraction of time the I/O thread spent doing I/O",
  :group "consumer-metrics",
  :tags {:client-id "consumer-55"}} {:metric-name {:name "io-ratio",
                                                   :description "The fraction of time the I/O thread spent doing I/O",
                                                   :group "consumer-metrics",
                                                   :tags {:client-id "consumer-55"}},
                                     :value 0.0}

Partitioners

The default partioners are provided if you want to pass them around in Clojure or for consumer configuration:

(ns my.ns
  (:require [franzy.clients.consumer.partitioners :as partitioners]))

If you want to control how your partition assignments are laid-out and assigned to consumers, you can implement your own partitioner. A protocol is also available FranzPartitionAssignor that will allow you to write your partition assignor with a protocol, for example via defrecord or deftype. You can then call make-partition-assignor and it will turn your protocol into a valid interface implementation. You should prefer to simply implement the PartitionAssignor interface directly for better performance as the protocol is only meant as a crude-shim for existing code.

Types/Records

Various concrete type implementations are scattered throughout Franzy and add-ons. Using these will in some cases give you a performance boost and/or reduced memory consumption, and avoid reflection in other cases. This is particularly relevant and beneficial if you find yourself allocating large batches of maps for production, consumption, admin bulk operations, offset management, etc.

Most records are stored in types.clj files across many namespaces. These correspond directly to the context they are used, i.e. common, consumer, producer, admin, etc.

For example, you can use the ProducerRecord Clojure record like-so:

(ns my.ns
  (:require [franzy.clients.producer.types] :as pt))

;;allocating a few hundred thousand of these, it's just a record....  
(pt/->ProducerRecord topic partition my-glorious-key my-odoriferous-eminating-value)

More importantly, these can easily be validated since each type has a corresponding schema, usually in a schema.clj file.

Validation

Franzy provides validation for all map structures and data types used. You may use this functionality even if you are not interested in the rest of Franzy.

Schemas for validation are created and validation using the excellent Schema library.

Simple configuration validation example:

(let [cc {:bootstrap.servers ["127.0.0.1:9092"]}]
    (s/validate cs/ConsumerConfig cc))

;;valid config
;;=> {:bootstrap.servers ["127.0.0.1:9092"]}

(let [cc {:bootstrap.servers ["127.0.0.1:8080"]
          :timeout.typo 123
          :auto.commit.interval.ms "forever"
          :auto.offset.reset nil}]
    (s/validate cs/ConsumerConfig cc))

;;=> ExceptionInfo Value does not match schema: {:auto.offset.reset (not (#{:latest :earliest :none} nil)), :auto.commit.interval.ms (throws? (GreaterThanOrEqualToZero? "forever")), :timeout.typo disallowed-key}  schema.core/validator/fn--4313 (core.clj:151)

Schemas are constantly being tweaked and exist for the following so far:

Most of the above will help you validate just about any map that comes in/out of Kafka should you choose. Validation can be toggled on/off thanks to schema and by your implementation.

Broker Discovery

If you need to discover one or more available brokers, there are a few ways to do this:

coming soon - helper functions for tranducing the results of the above into useful forms for use cases such as bootstrap.servers

Testing/Dev

You can run an embedded cluster of Zookeeper and/or Kafka using the following together or in conjunction with other libraries that provide similar functionality:

Both libraries above provide concrete types for auditing, avoiding reflection, ease-of-use as well as protocols and versions using component. Both also have full Clojure APIs.

Another option is to use docker containers. I have successfully tested both of the above options mixed with docker without issue. In general, be sure all servers can see each other on the network to avoid problems.

Here's at least one Docker image that uses Docker compose and includes Zookeeper and Kafka.

Contributing/Roadmap

This library is still very young and is surely filled with bugs. Pull requests are welcome.

The following items are planned or have received partial development, I make no guarantees on timelines but plan to release some of these items in conjunction with getting other real-world work done using them:

Please contact me if any of these are high-demand for you so I can guage the urgency better.

Of particular concern/value to fix/refactor/enhance currently:

Please be aware many problems/issues may be due to Kafka itself or the Java API. As such, before submitting issues, please check the Kafka official issue trackers first. If there is a reasonable workaround or solution, please leave a note and link to the underlying issues.

Contact

Find me on Clojurians Slack - @ymilky

License

Copyright © 2016 Yossi M. (ymilky).

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

Use at your own risk, I am not responsible or liable. Please give credit if you use pieces of this library or otherwise, it is much appreciated.

Acknowledgements

Thanks to the following people for advice, support, code, and/or inspiration: