File: active_support/vendor/memcache-client-1.7.4/memcache.rb

Overview
Module Structure
Class Hierarchy
Code

Overview

Module Structure

  module: <Toplevel Module>
  class: MemCache#13
inherits from
  Object ( Builtin-Module )
has properties
constant: VERSION #18
constant: DEFAULT_OPTIONS #23
constant: DEFAULT_PORT #36
constant: DEFAULT_WEIGHT #41
attribute: namespace [R] #46
attribute: multithread [R] #51
attribute: servers [R] #56
attribute: timeout [R] #62
attribute: failover [R] #68
attribute: logger [R] #73
attribute: no_reply [R] #79
method: initialize / 1 #102
method: inspect #142
method: active? #150
method: readonly? #157
method: servers= / 1 #166
method: decr / 2 #193
method: get / 2 #206
method: fetch / 3 #226
method: get_multi / 1 #255
method: incr / 2 #295
constant: ONE_MB #311
method: set / 4 #313
method: cas / 3 #355
method: add / 4 #393
method: replace / 4 #414
method: append / 2 #435
method: prepend / 2 #455
method: delete / 2 #474
method: flush_all / 1 #497
method: reset #528
method: stats #564
alias: [] get #607
method: []= / 2 #613
method: make_cache_key / 1 #623
method: hash_for / 1 #635
method: get_server_for_key / 2 #642
method: cache_decr / 3 #666
method: cache_get / 2 #681
method: gets / 2 #705
method: cache_get_multi / 2 #740
method: cache_incr / 3 #768
method: with_socket_management / 2 #793
method: with_server / 1 #824
method: handle_error / 2 #843
method: noreply #851
method: request_setup / 1 #859
method: raise_on_error_response! / 1 #866
method: create_continuum_for / 1 #872
method: entry_count_for / 3 #887
method: check_multithread_status! #891
  class: Server#906
inherits from
  Object ( Builtin-Module )
has properties
constant: RETRY_DELAY #912
attribute: host [R] #917
attribute: port [R] #922
attribute: weight [R] #927
attribute: retry [R] #932
attribute: status [R] #937
attribute: logger [R] #939
method: initialize / 4 #945
method: inspect #963
method: alive? #973
method: socket #981
method: connect_to / 3 #1003
method: close #1013
method: mark_dead / 1 #1023
  class: MemCacheError#1038
inherits from
  RuntimeError ( Builtin-Module )
  class: BufferedIO#1040
inherits from
  BufferedIO ( Net )
has properties
constant: BUFSIZE #1041
method: rbuf_fill #1045
method: setsockopt #1058
method: gets #1062
  module: Continuum#1069
has properties
constant: POINTS_PER_SERVER #1070
module method: binary_search / 3 #1073
  class: Entry#1093
inherits from
  Object ( Builtin-Module )
has properties
attribute: value [R] #1094
attribute: server [R] #1095
method: initialize / 2 #1097
method: inspect #1102

Code

   1  $TESTING = defined?($TESTING) && $TESTING
   2 
   3  require 'socket'
   4  require 'thread'
   5  require 'zlib'
   6  require 'digest/sha1'
   7  require 'net/protocol'
   8 
   9  ##
  10  # A Ruby client library for memcached.
  11  #
  12 
  13  class MemCache
  14 
  15    ##
  16    # The version of MemCache you are using.
  17 
  18    VERSION = '1.7.4'
  19 
  20    ##
  21    # Default options for the cache object.
  22 
  23    DEFAULT_OPTIONS = {
  24      :namespace   => nil,
  25      :readonly    => false,
  26      :multithread => true,
  27      :failover    => true,
  28      :timeout     => 0.5,
  29      :logger      => nil,
  30      :no_reply    => false,
  31    }
  32 
  33    ##
  34    # Default memcached port.
  35 
  36    DEFAULT_PORT = 11211
  37 
  38    ##
  39    # Default memcached server weight.
  40 
  41    DEFAULT_WEIGHT = 1
  42 
  43    ##
  44    # The namespace for this instance
  45 
  46    attr_reader :namespace
  47 
  48    ##
  49    # The multithread setting for this instance
  50 
  51    attr_reader :multithread
  52 
  53    ##
  54    # The servers this client talks to.  Play at your own peril.
  55 
  56    attr_reader :servers
  57 
  58    ##
  59    # Socket timeout limit with this client, defaults to 0.5 sec.
  60    # Set to nil to disable timeouts.
  61 
  62    attr_reader :timeout
  63 
  64    ##
  65    # Should the client try to failover to another server if the
  66    # first server is down?  Defaults to true.
  67 
  68    attr_reader :failover
  69 
  70    ##
  71    # Log debug/info/warn/error to the given Logger, defaults to nil.
  72 
  73    attr_reader :logger
  74 
  75    ##
  76    # Don't send or look for a reply from the memcached server for write operations.
  77    # Please note this feature only works in memcached 1.2.5 and later.  Earlier
  78    # versions will reply with "ERROR".
  79    attr_reader :no_reply
  80 
  81    ##
  82    # Accepts a list of +servers+ and a list of +opts+.  +servers+ may be
  83    # omitted.  See +servers=+ for acceptable server list arguments.
  84    #
  85    # Valid options for +opts+ are:
  86    #
  87    #   [:namespace]   Prepends this value to all keys added or retrieved.
  88    #   [:readonly]    Raises an exception on cache writes when true.
  89    #   [:multithread] Wraps cache access in a Mutex for thread safety. Defaults to true.
  90    #   [:failover]    Should the client try to failover to another server if the
  91    #                  first server is down?  Defaults to true.
  92    #   [:timeout]     Time to use as the socket read timeout.  Defaults to 0.5 sec,
  93    #                  set to nil to disable timeouts (this is a major performance penalty in Ruby 1.8,
  94    #                  "gem install SystemTimer' to remove most of the penalty).
  95    #   [:logger]      Logger to use for info/debug output, defaults to nil
  96    #   [:no_reply]    Don't bother looking for a reply for write operations (i.e. they
  97    #                  become 'fire and forget'), memcached 1.2.5 and later only, speeds up
  98    #                  set/add/delete/incr/decr significantly.
  99    #
 100    # Other options are ignored.
 101 
 102    def initialize(*args)
 103      servers = []
 104      opts = {}
 105 
 106      case args.length
 107      when 0 then # NOP
 108      when 1 then
 109        arg = args.shift
 110        case arg
 111        when Hash   then opts = arg
 112        when Array  then servers = arg
 113        when String then servers = [arg]
 114        else raise ArgumentError, 'first argument must be Array, Hash or String'
 115        end
 116      when 2 then
 117        servers, opts = args
 118      else
 119        raise ArgumentError, "wrong number of arguments (#{args.length} for 2)"
 120      end
 121 
 122      opts = DEFAULT_OPTIONS.merge opts
 123      @namespace   = opts[:namespace]
 124      @readonly    = opts[:readonly]
 125      @multithread = opts[:multithread]
 126      @timeout     = opts[:timeout]
 127      @failover    = opts[:failover]
 128      @logger      = opts[:logger]
 129      @no_reply    = opts[:no_reply]
 130      @mutex       = Mutex.new if @multithread
 131 
 132      logger.info { "memcache-client #{VERSION} #{Array(servers).inspect}" } if logger
 133 
 134      Thread.current[:memcache_client] = self.object_id if !@multithread
 135 
 136      self.servers = servers
 137    end
 138 
 139    ##
 140    # Returns a string representation of the cache object.
 141 
 142    def inspect
 143      "<MemCache: %d servers, ns: %p, ro: %p>" %
 144        [@servers.length, @namespace, @readonly]
 145    end
 146 
 147    ##
 148    # Returns whether there is at least one active server for the object.
 149 
 150    def active?
 151      not @servers.empty?
 152    end
 153 
 154    ##
 155    # Returns whether or not the cache object was created read only.
 156 
 157    def readonly?
 158      @readonly
 159    end
 160 
 161    ##
 162    # Set the servers that the requests will be distributed between.  Entries
 163    # can be either strings of the form "hostname:port" or
 164    # "hostname:port:weight" or MemCache::Server objects.
 165    #
 166    def servers=(servers)
 167      # Create the server objects.
 168      @servers = Array(servers).collect do |server|
 169        case server
 170        when String
 171          host, port, weight = server.split ':', 3
 172          port ||= DEFAULT_PORT
 173          weight ||= DEFAULT_WEIGHT
 174          Server.new self, host, port, weight
 175        else
 176          server
 177        end
 178      end
 179 
 180      logger.debug { "Servers now: #{@servers.inspect}" } if logger
 181 
 182      # There's no point in doing this if there's only one server
 183      @continuum = create_continuum_for(@servers) if @servers.size > 1
 184 
 185      @servers
 186    end
 187 
 188    ##
 189    # Decrements the value for +key+ by +amount+ and returns the new value.
 190    # +key+ must already exist.  If +key+ is not an integer, it is assumed to be
 191    # 0.  +key+ can not be decremented below 0.
 192 
 193    def decr(key, amount = 1)
 194      raise MemCacheError, "Update of readonly cache" if @readonly
 195      with_server(key) do |server, cache_key|
 196        cache_decr server, cache_key, amount
 197      end
 198    rescue TypeError => err
 199      handle_error nil, err
 200    end
 201 
 202    ##
 203    # Retrieves +key+ from memcache.  If +raw+ is false, the value will be
 204    # unmarshalled.
 205 
 206    def get(key, raw = false)
 207      with_server(key) do |server, cache_key|
 208        logger.debug { "get #{key} from #{server.inspect}" } if logger
 209        value = cache_get server, cache_key
 210        return nil if value.nil?
 211        value = Marshal.load value unless raw
 212        return value
 213      end
 214    rescue TypeError => err
 215      handle_error nil, err
 216    end
 217 
 218    ##
 219    # Performs a +get+ with the given +key+.  If 
 220    # the value does not exist and a block was given,
 221    # the block will be called and the result saved via +add+.
 222    #
 223    # If you do not provide a block, using this
 224    # method is the same as using +get+.
 225    #
 226    def fetch(key, expiry = 0, raw = false)
 227      value = get(key, raw)
 228 
 229      if value.nil? && block_given?
 230        value = yield
 231        add(key, value, expiry, raw)
 232      end
 233 
 234      value
 235    end
 236 
 237    ##
 238    # Retrieves multiple values from memcached in parallel, if possible.
 239    #
 240    # The memcached protocol supports the ability to retrieve multiple
 241    # keys in a single request.  Pass in an array of keys to this method
 242    # and it will:
 243    #
 244    # 1. map the key to the appropriate memcached server
 245    # 2. send a single request to each server that has one or more key values
 246    #
 247    # Returns a hash of values.
 248    #
 249    #   cache["a"] = 1
 250    #   cache["b"] = 2
 251    #   cache.get_multi "a", "b" # => { "a" => 1, "b" => 2 }
 252    #
 253    # Note that get_multi assumes the values are marshalled.
 254 
 255    def get_multi(*keys)
 256      raise MemCacheError, 'No active servers' unless active?
 257 
 258      keys.flatten!
 259      key_count = keys.length
 260      cache_keys = {}
 261      server_keys = Hash.new { |h,k| h[k] = [] }
 262 
 263      # map keys to servers
 264      keys.each do |key|
 265        server, cache_key = request_setup key
 266        cache_keys[cache_key] = key
 267        server_keys[server] << cache_key
 268      end
 269 
 270      results = {}
 271 
 272      server_keys.each do |server, keys_for_server|
 273        keys_for_server_str = keys_for_server.join ' '
 274        begin
 275          values = cache_get_multi server, keys_for_server_str
 276          values.each do |key, value|
 277            results[cache_keys[key]] = Marshal.load value
 278          end
 279        rescue IndexError => e
 280          # Ignore this server and try the others
 281          logger.warn { "Unable to retrieve #{keys_for_server.size} elements from #{server.inspect}: #{e.message}"} if logger
 282        end
 283      end
 284 
 285      return results
 286    rescue TypeError => err
 287      handle_error nil, err
 288    end
 289 
 290    ##
 291    # Increments the value for +key+ by +amount+ and returns the new value.
 292    # +key+ must already exist.  If +key+ is not an integer, it is assumed to be
 293    # 0.
 294 
 295    def incr(key, amount = 1)
 296      raise MemCacheError, "Update of readonly cache" if @readonly
 297      with_server(key) do |server, cache_key|
 298        cache_incr server, cache_key, amount
 299      end
 300    rescue TypeError => err
 301      handle_error nil, err
 302    end
 303 
 304    ##
 305    # Add +key+ to the cache with value +value+ that expires in +expiry+
 306    # seconds.  If +raw+ is true, +value+ will not be Marshalled.
 307    #
 308    # Warning: Readers should not call this method in the event of a cache miss;
 309    # see MemCache#add.
 310 
 311    ONE_MB = 1024 * 1024
 312 
 313    def set(key, value, expiry = 0, raw = false)
 314      raise MemCacheError, "Update of readonly cache" if @readonly
 315      with_server(key) do |server, cache_key|
 316 
 317        value = Marshal.dump value unless raw
 318        logger.debug { "set #{key} to #{server.inspect}: #{value.to_s.size}" } if logger
 319 
 320        raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if value.to_s.size > ONE_MB
 321 
 322        command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
 323 
 324        with_socket_management(server) do |socket|
 325          socket.write command
 326          break nil if @no_reply
 327          result = socket.gets
 328          raise_on_error_response! result
 329 
 330          if result.nil?
 331            server.close
 332            raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
 333          end
 334 
 335          result
 336        end
 337      end
 338    end
 339 
 340    ##
 341    # "cas" is a check and set operation which means "store this data but
 342    # only if no one else has updated since I last fetched it."  This can
 343    # be used as a form of optimistic locking.
 344    #
 345    # Works in block form like so:
 346    #   cache.cas('some-key') do |value|
 347    #     value + 1
 348    #   end
 349    #
 350    # Returns:
 351    # +nil+ if the value was not found on the memcached server.
 352    # +STORED+ if the value was updated successfully
 353    # +EXISTS+ if the value was updated by someone else since last fetch
 354 
 355    def cas(key, expiry=0, raw=false)
 356      raise MemCacheError, "Update of readonly cache" if @readonly
 357      raise MemCacheError, "A block is required" unless block_given?
 358 
 359      (value, token) = gets(key, raw)
 360      return nil unless value
 361      updated = yield value
 362 
 363      with_server(key) do |server, cache_key|
 364 
 365        value = Marshal.dump updated unless raw
 366        logger.debug { "cas #{key} to #{server.inspect}: #{value.to_s.size}" } if logger
 367        command = "cas #{cache_key} 0 #{expiry} #{value.to_s.size} #{token}#{noreply}\r\n#{value}\r\n"
 368 
 369        with_socket_management(server) do |socket|
 370          socket.write command
 371          break nil if @no_reply
 372          result = socket.gets
 373          raise_on_error_response! result
 374 
 375          if result.nil?
 376            server.close
 377            raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
 378          end
 379 
 380          result
 381        end
 382      end
 383    end
 384 
 385    ##
 386    # Add +key+ to the cache with value +value+ that expires in +expiry+
 387    # seconds, but only if +key+ does not already exist in the cache.
 388    # If +raw+ is true, +value+ will not be Marshalled.
 389    #
 390    # Readers should call this method in the event of a cache miss, not
 391    # MemCache#set.
 392 
 393    def add(key, value, expiry = 0, raw = false)
 394      raise MemCacheError, "Update of readonly cache" if @readonly
 395      with_server(key) do |server, cache_key|
 396        value = Marshal.dump value unless raw
 397        logger.debug { "add #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
 398        command = "add #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
 399 
 400        with_socket_management(server) do |socket|
 401          socket.write command
 402          break nil if @no_reply
 403          result = socket.gets
 404          raise_on_error_response! result
 405          result
 406        end
 407      end
 408    end
 409    
 410    ##
 411    # Add +key+ to the cache with value +value+ that expires in +expiry+
 412    # seconds, but only if +key+ already exists in the cache.
 413    # If +raw+ is true, +value+ will not be Marshalled.
 414    def replace(key, value, expiry = 0, raw = false)
 415      raise MemCacheError, "Update of readonly cache" if @readonly
 416      with_server(key) do |server, cache_key|
 417        value = Marshal.dump value unless raw
 418        logger.debug { "replace #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
 419        command = "replace #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
 420 
 421        with_socket_management(server) do |socket|
 422          socket.write command
 423          break nil if @no_reply
 424          result = socket.gets
 425          raise_on_error_response! result
 426          result
 427        end
 428      end
 429    end
 430 
 431    ##
 432    # Append - 'add this data to an existing key after existing data'
 433    # Please note the value is always passed to memcached as raw since it
 434    # doesn't make a lot of sense to concatenate marshalled data together.
 435    def append(key, value)
 436      raise MemCacheError, "Update of readonly cache" if @readonly
 437      with_server(key) do |server, cache_key|
 438        logger.debug { "append #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
 439        command = "append #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
 440 
 441        with_socket_management(server) do |socket|
 442          socket.write command
 443          break nil if @no_reply
 444          result = socket.gets
 445          raise_on_error_response! result
 446          result
 447        end
 448      end
 449    end
 450 
 451    ##
 452    # Prepend - 'add this data to an existing key before existing data'
 453    # Please note the value is always passed to memcached as raw since it
 454    # doesn't make a lot of sense to concatenate marshalled data together.
 455    def prepend(key, value)
 456      raise MemCacheError, "Update of readonly cache" if @readonly
 457      with_server(key) do |server, cache_key|
 458        logger.debug { "prepend #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
 459        command = "prepend #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
 460 
 461        with_socket_management(server) do |socket|
 462          socket.write command
 463          break nil if @no_reply
 464          result = socket.gets
 465          raise_on_error_response! result
 466          result
 467        end
 468      end
 469    end
 470 
 471    ##
 472    # Removes +key+ from the cache in +expiry+ seconds.
 473 
 474    def delete(key, expiry = 0)
 475      raise MemCacheError, "Update of readonly cache" if @readonly
 476      with_server(key) do |server, cache_key|
 477        with_socket_management(server) do |socket|
 478          logger.debug { "delete #{cache_key} on #{server}" } if logger
 479          socket.write "delete #{cache_key} #{expiry}#{noreply}\r\n"
 480          break nil if @no_reply
 481          result = socket.gets
 482          raise_on_error_response! result
 483          result
 484        end
 485      end
 486    end
 487 
 488    ##
 489    # Flush the cache from all memcache servers.
 490    # A non-zero value for +delay+ will ensure that the flush
 491    # is propogated slowly through your memcached server farm.
 492    # The Nth server will be flushed N*delay seconds from now,
 493    # asynchronously so this method returns quickly.
 494    # This prevents a huge database spike due to a total
 495    # flush all at once.
 496 
 497    def flush_all(delay=0)
 498      raise MemCacheError, 'No active servers' unless active?
 499      raise MemCacheError, "Update of readonly cache" if @readonly
 500 
 501      begin
 502        delay_time = 0
 503        @servers.each do |server|
 504          with_socket_management(server) do |socket|
 505            logger.debug { "flush_all #{delay_time} on #{server}" } if logger
 506            if delay == 0 # older versions of memcached will fail silently otherwise
 507              socket.write "flush_all#{noreply}\r\n"
 508            else
 509              socket.write "flush_all #{delay_time}#{noreply}\r\n"
 510            end
 511            break nil if @no_reply
 512            result = socket.gets
 513            raise_on_error_response! result
 514            result
 515          end
 516          delay_time += delay
 517        end
 518      rescue IndexError => err
 519        handle_error nil, err
 520      end
 521    end
 522 
 523    ##
 524    # Reset the connection to all memcache servers.  This should be called if
 525    # there is a problem with a cache lookup that might have left the connection
 526    # in a corrupted state.
 527 
 528    def reset
 529      @servers.each { |server| server.close }
 530    end
 531 
 532    ##
 533    # Returns statistics for each memcached server.  An explanation of the
 534    # statistics can be found in the memcached docs:
 535    #
 536    # http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt
 537    #
 538    # Example:
 539    #
 540    #   >> pp CACHE.stats
 541    #   {"localhost:11211"=>
 542    #     {"bytes"=>4718,
 543    #      "pid"=>20188,
 544    #      "connection_structures"=>4,
 545    #      "time"=>1162278121,
 546    #      "pointer_size"=>32,
 547    #      "limit_maxbytes"=>67108864,
 548    #      "cmd_get"=>14532,
 549    #      "version"=>"1.2.0",
 550    #      "bytes_written"=>432583,
 551    #      "cmd_set"=>32,
 552    #      "get_misses"=>0,
 553    #      "total_connections"=>19,
 554    #      "curr_connections"=>3,
 555    #      "curr_items"=>4,
 556    #      "uptime"=>1557,
 557    #      "get_hits"=>14532,
 558    #      "total_items"=>32,
 559    #      "rusage_system"=>0.313952,
 560    #      "rusage_user"=>0.119981,
 561    #      "bytes_read"=>190619}}
 562    #   => nil
 563 
 564    def stats
 565      raise MemCacheError, "No active servers" unless active?
 566      server_stats = {}
 567 
 568      @servers.each do |server|
 569        next unless server.alive?
 570 
 571        with_socket_management(server) do |socket|
 572          value = nil
 573          socket.write "stats\r\n"
 574          stats = {}
 575          while line = socket.gets do
 576            raise_on_error_response! line
 577            break if line == "END\r\n"
 578            if line =~ /\ASTAT ([\S]+) ([\w\.\:]+)/ then
 579              name, value = $1, $2
 580              stats[name] = case name
 581                            when 'version'
 582                              value
 583                            when 'rusage_user', 'rusage_system' then
 584                              seconds, microseconds = value.split(/:/, 2)
 585                              microseconds ||= 0
 586                              Float(seconds) + (Float(microseconds) / 1_000_000)
 587                            else
 588                              if value =~ /\A\d+\Z/ then
 589                                value.to_i
 590                              else
 591                                value
 592                              end
 593                            end
 594            end
 595          end
 596          server_stats["#{server.host}:#{server.port}"] = stats
 597        end
 598      end
 599 
 600      raise MemCacheError, "No active servers" if server_stats.empty?
 601      server_stats
 602    end
 603 
 604    ##
 605    # Shortcut to get a value from the cache.
 606 
 607    alias [] get
 608 
 609    ##
 610    # Shortcut to save a value in the cache.  This method does not set an
 611    # expiration on the entry.  Use set to specify an explicit expiry.
 612 
 613    def []=(key, value)
 614      set key, value
 615    end
 616 
 617    protected unless $TESTING
 618 
 619    ##
 620    # Create a key for the cache, incorporating the namespace qualifier if
 621    # requested.
 622 
 623    def make_cache_key(key)
 624      if namespace.nil? then
 625        key
 626      else
 627        "#{@namespace}:#{key}"
 628      end
 629    end
 630 
 631    ##
 632    # Returns an interoperable hash value for +key+.  (I think, docs are
 633    # sketchy for down servers).
 634 
 635    def hash_for(key)
 636      Zlib.crc32(key)
 637    end
 638 
 639    ##
 640    # Pick a server to handle the request based on a hash of the key.
 641 
 642    def get_server_for_key(key, options = {})
 643      raise ArgumentError, "illegal character in key #{key.inspect}" if
 644        key =~ /\s/
 645      raise ArgumentError, "key too long #{key.inspect}" if key.length > 250
 646      raise MemCacheError, "No servers available" if @servers.empty?
 647      return @servers.first if @servers.length == 1
 648 
 649      hkey = hash_for(key)
 650 
 651      20.times do |try|
 652        entryidx = Continuum.binary_search(@continuum, hkey)
 653        server = @continuum[entryidx].server
 654        return server if server.alive?
 655        break unless failover
 656        hkey = hash_for "#{try}#{key}"
 657      end
 658      
 659      raise MemCacheError, "No servers available"
 660    end
 661 
 662    ##
 663    # Performs a raw decr for +cache_key+ from +server+.  Returns nil if not
 664    # found.
 665 
 666    def cache_decr(server, cache_key, amount)
 667      with_socket_management(server) do |socket|
 668        socket.write "decr #{cache_key} #{amount}#{noreply}\r\n"
 669        break nil if @no_reply
 670        text = socket.gets
 671        raise_on_error_response! text
 672        return nil if text == "NOT_FOUND\r\n"
 673        return text.to_i
 674      end
 675    end
 676 
 677    ##
 678    # Fetches the raw data for +cache_key+ from +server+.  Returns nil on cache
 679    # miss.
 680 
 681    def cache_get(server, cache_key)
 682      with_socket_management(server) do |socket|
 683        socket.write "get #{cache_key}\r\n"
 684        keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n"
 685 
 686        if keyline.nil? then
 687          server.close
 688          raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
 689        end
 690 
 691        raise_on_error_response! keyline
 692        return nil if keyline == "END\r\n"
 693 
 694        unless keyline =~ /(\d+)\r/ then
 695          server.close
 696          raise MemCacheError, "unexpected response #{keyline.inspect}"
 697        end
 698        value = socket.read $1.to_i
 699        socket.read 2 # "\r\n"
 700        socket.gets   # "END\r\n"
 701        return value
 702      end
 703    end
 704 
 705    def gets(key, raw = false)
 706      with_server(key) do |server, cache_key|
 707        logger.debug { "gets #{key} from #{server.inspect}" } if logger
 708        result = with_socket_management(server) do |socket|
 709          socket.write "gets #{cache_key}\r\n"
 710          keyline = socket.gets # "VALUE <key> <flags> <bytes> <cas token>\r\n"
 711 
 712          if keyline.nil? then
 713            server.close
 714            raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
 715          end
 716 
 717          raise_on_error_response! keyline
 718          return nil if keyline == "END\r\n"
 719 
 720          unless keyline =~ /(\d+) (\w+)\r/ then
 721            server.close
 722            raise MemCacheError, "unexpected response #{keyline.inspect}"
 723          end
 724          value = socket.read $1.to_i
 725          socket.read 2 # "\r\n"
 726          socket.gets   # "END\r\n"
 727          [value, $2]
 728        end
 729        result[0] = Marshal.load result[0] unless raw
 730        result
 731      end
 732    rescue TypeError => err
 733      handle_error nil, err
 734    end
 735 
 736 
 737    ##
 738    # Fetches +cache_keys+ from +server+ using a multi-get.
 739 
 740    def cache_get_multi(server, cache_keys)
 741      with_socket_management(server) do |socket|
 742        values = {}
 743        socket.write "get #{cache_keys}\r\n"
 744 
 745        while keyline = socket.gets do
 746          return values if keyline == "END\r\n"
 747          raise_on_error_response! keyline
 748 
 749          unless keyline =~ /\AVALUE (.+) (.+) (.+)/ then
 750            server.close
 751            raise MemCacheError, "unexpected response #{keyline.inspect}"
 752          end
 753 
 754          key, data_length = $1, $3
 755          values[$1] = socket.read data_length.to_i
 756          socket.read(2) # "\r\n"
 757        end
 758 
 759        server.close
 760        raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too
 761      end
 762    end
 763 
 764    ##
 765    # Performs a raw incr for +cache_key+ from +server+.  Returns nil if not
 766    # found.
 767 
 768    def cache_incr(server, cache_key, amount)
 769      with_socket_management(server) do |socket|
 770        socket.write "incr #{cache_key} #{amount}#{noreply}\r\n"
 771        break nil if @no_reply
 772        text = socket.gets
 773        raise_on_error_response! text
 774        return nil if text == "NOT_FOUND\r\n"
 775        return text.to_i
 776      end
 777    end
 778 
 779    ##
 780    # Gets or creates a socket connected to the given server, and yields it
 781    # to the block, wrapped in a mutex synchronization if @multithread is true.
 782    #
 783    # If a socket error (SocketError, SystemCallError, IOError) or protocol error
 784    # (MemCacheError) is raised by the block, closes the socket, attempts to
 785    # connect again, and retries the block (once).  If an error is again raised,
 786    # reraises it as MemCacheError.
 787    #
 788    # If unable to connect to the server (or if in the reconnect wait period),
 789    # raises MemCacheError.  Note that the socket connect code marks a server
 790    # dead for a timeout period, so retrying does not apply to connection attempt
 791    # failures (but does still apply to unexpectedly lost connections etc.).
 792 
 793    def with_socket_management(server, &block)
 794      check_multithread_status!
 795 
 796      @mutex.lock if @multithread
 797      retried = false
 798 
 799      begin
 800        socket = server.socket
 801 
 802        # Raise an IndexError to show this server is out of whack. If were inside
 803        # a with_server block, we'll catch it and attempt to restart the operation.
 804 
 805        raise IndexError, "No connection to server (#{server.status})" if socket.nil?
 806 
 807        block.call(socket)
 808 
 809      rescue SocketError, Errno::EAGAIN, Timeout::Error => err
 810        logger.warn { "Socket failure: #{err.message}" } if logger
 811        server.mark_dead(err)
 812        handle_error(server, err)
 813 
 814      rescue MemCacheError, SystemCallError, IOError => err
 815        logger.warn { "Generic failure: #{err.class.name}: #{err.message}" } if logger
 816        handle_error(server, err) if retried || socket.nil?
 817        retried = true
 818        retry
 819      end
 820    ensure
 821      @mutex.unlock if @multithread
 822    end
 823 
 824    def with_server(key)
 825      retried = false
 826      begin
 827        server, cache_key = request_setup(key)
 828        yield server, cache_key
 829      rescue IndexError => e
 830        logger.warn { "Server failed: #{e.class.name}: #{e.message}" } if logger
 831        if !retried && @servers.size > 1
 832          logger.info { "Connection to server #{server.inspect} DIED! Retrying operation..." } if logger
 833          retried = true
 834          retry
 835        end
 836        handle_error(nil, e)
 837      end
 838    end
 839 
 840    ##
 841    # Handles +error+ from +server+.
 842 
 843    def handle_error(server, error)
 844      raise error if error.is_a?(MemCacheError)
 845      server.close if server
 846      new_error = MemCacheError.new error.message
 847      new_error.set_backtrace error.backtrace
 848      raise new_error
 849    end
 850 
 851    def noreply
 852      @no_reply ? ' noreply' : ''
 853    end
 854 
 855    ##
 856    # Performs setup for making a request with +key+ from memcached.  Returns
 857    # the server to fetch the key from and the complete key to use.
 858 
 859    def request_setup(key)
 860      raise MemCacheError, 'No active servers' unless active?
 861      cache_key = make_cache_key key
 862      server = get_server_for_key cache_key
 863      return server, cache_key
 864    end
 865 
 866    def raise_on_error_response!(response)
 867      if response =~ /\A(?:CLIENT_|SERVER_)?ERROR(.*)/
 868        raise MemCacheError, $1.strip
 869      end
 870    end
 871 
 872    def create_continuum_for(servers)
 873      total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
 874      continuum = []
 875 
 876      servers.each do |server|
 877        entry_count_for(server, servers.size, total_weight).times do |idx|
 878          hash = Digest::SHA1.hexdigest("#{server.host}:#{server.port}:#{idx}")
 879          value = Integer("0x#{hash[0..7]}")
 880          continuum << Continuum::Entry.new(value, server)
 881        end
 882      end
 883 
 884      continuum.sort { |a, b| a.value <=> b.value }
 885    end
 886 
 887    def entry_count_for(server, total_servers, total_weight)
 888      ((total_servers * Continuum::POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor
 889    end
 890 
 891    def check_multithread_status!
 892      return if @multithread
 893 
 894      if Thread.current[:memcache_client] != self.object_id
 895        raise MemCacheError, <<-EOM
 896          You are accessing this memcache-client instance from multiple threads but have not enabled multithread support.
 897          Normally:  MemCache.new(['localhost:11211'], :multithread => true)
 898          In Rails:  config.cache_store = [:mem_cache_store, 'localhost:11211', { :multithread => true }]
 899        EOM
 900      end
 901    end
 902 
 903    ##
 904    # This class represents a memcached server instance.
 905 
 906    class Server
 907 
 908      ##
 909      # The amount of time to wait before attempting to re-establish a
 910      # connection with a server that is marked dead.
 911 
 912      RETRY_DELAY = 30.0
 913 
 914      ##
 915      # The host the memcached server is running on.
 916 
 917      attr_reader :host
 918 
 919      ##
 920      # The port the memcached server is listening on.
 921 
 922      attr_reader :port
 923 
 924      ##
 925      # The weight given to the server.
 926 
 927      attr_reader :weight
 928 
 929      ##
 930      # The time of next retry if the connection is dead.
 931 
 932      attr_reader :retry
 933 
 934      ##
 935      # A text status string describing the state of the server.
 936 
 937      attr_reader :status
 938 
 939      attr_reader :logger
 940 
 941      ##
 942      # Create a new MemCache::Server object for the memcached instance
 943      # listening on the given host and port, weighted by the given weight.
 944 
 945      def initialize(memcache, host, port = DEFAULT_PORT, weight = DEFAULT_WEIGHT)
 946        raise ArgumentError, "No host specified" if host.nil? or host.empty?
 947        raise ArgumentError, "No port specified" if port.nil? or port.to_i.zero?
 948 
 949        @host   = host
 950        @port   = port.to_i
 951        @weight = weight.to_i
 952 
 953        @sock   = nil
 954        @retry  = nil
 955        @status = 'NOT CONNECTED'
 956        @timeout = memcache.timeout
 957        @logger = memcache.logger
 958      end
 959 
 960      ##
 961      # Return a string representation of the server object.
 962 
 963      def inspect
 964        "<MemCache::Server: %s:%d [%d] (%s)>" % [@host, @port, @weight, @status]
 965      end
 966 
 967      ##
 968      # Check whether the server connection is alive.  This will cause the
 969      # socket to attempt to connect if it isn't already connected and or if
 970      # the server was previously marked as down and the retry time has
 971      # been exceeded.
 972 
 973      def alive?
 974        !!socket
 975      end
 976 
 977      ##
 978      # Try to connect to the memcached server targeted by this object.
 979      # Returns the connected socket object on success or nil on failure.
 980 
 981      def socket
 982        return @sock if @sock and not @sock.closed?
 983 
 984        @sock = nil
 985 
 986        # If the host was dead, don't retry for a while.
 987        return if @retry and @retry > Time.now
 988 
 989        # Attempt to connect if not already connected.
 990        begin
 991          @sock = connect_to(@host, @port, @timeout)
 992          @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
 993          @retry  = nil
 994          @status = 'CONNECTED'
 995        rescue SocketError, SystemCallError, IOError => err
 996          logger.warn { "Unable to open socket: #{err.class.name}, #{err.message}" } if logger
 997          mark_dead err
 998        end
 999 
1000        return @sock
1001      end
1002 
1003      def connect_to(host, port, timeout=nil)
1004        io = MemCache::BufferedIO.new(TCPSocket.new(host, port))
1005        io.read_timeout = timeout
1006        io
1007      end
1008 
1009      ##
1010      # Close the connection to the memcached server targeted by this
1011      # object.  The server is not considered dead.
1012 
1013      def close
1014        @sock.close if @sock && !@sock.closed?
1015        @sock   = nil
1016        @retry  = nil
1017        @status = "NOT CONNECTED"
1018      end
1019 
1020      ##
1021      # Mark the server as dead and close its socket.
1022 
1023      def mark_dead(error)
1024        @sock.close if @sock && !@sock.closed?
1025        @sock   = nil
1026        @retry  = Time.now + RETRY_DELAY
1027 
1028        reason = "#{error.class.name}: #{error.message}"
1029        @status = sprintf "%s:%s DEAD (%s), will retry at %s", @host, @port, reason, @retry
1030        @logger.info { @status } if @logger
1031      end
1032 
1033    end
1034 
1035    ##
1036    # Base MemCache exception class.
1037 
1038    class MemCacheError < RuntimeError; end
1039 
1040    class BufferedIO < Net::BufferedIO # :nodoc:
1041      BUFSIZE = 1024 * 16
1042 
1043      # An implementation similar to this is in *trunk* for 1.9.  When it
1044      # gets released, this method can be removed when using 1.9
1045      def rbuf_fill
1046        begin
1047          @rbuf << @io.read_nonblock(BUFSIZE)
1048        rescue Errno::EWOULDBLOCK
1049          retry unless @read_timeout
1050          if IO.select([@io], nil, nil, @read_timeout)
1051            retry
1052          else
1053            raise Timeout::Error, 'IO timeout'
1054          end
1055        end
1056      end
1057 
1058      def setsockopt *args
1059        @io.setsockopt *args
1060      end
1061 
1062      def gets
1063        readuntil("\n")
1064      end
1065    end
1066 
1067  end
1068 
1069  module Continuum
1070    POINTS_PER_SERVER = 160 # this is the default in libmemcached
1071 
1072    # Find the closest index in Continuum with value <= the given value
1073    def self.binary_search(ary, value, &block)
1074      upper = ary.size - 1
1075      lower = 0
1076      idx = 0
1077 
1078      while(lower <= upper) do
1079        idx = (lower + upper) / 2
1080        comp = ary[idx].value <=> value
1081 
1082        if comp == 0
1083          return idx
1084        elsif comp > 0
1085          upper = idx - 1
1086        else
1087          lower = idx + 1
1088        end
1089      end
1090      return upper
1091    end
1092 
1093    class Entry
1094      attr_reader :value
1095      attr_reader :server
1096 
1097      def initialize(val, srv)
1098        @value = val
1099        @server = srv
1100      end
1101 
1102      def inspect
1103        "<#{value}, #{server.host}:#{server.port}>"
1104      end
1105    end
1106 
1107  end