1 $TESTING = defined?($TESTING) && $TESTING
2
3 require 'socket'
4 require 'thread'
5 require 'zlib'
6 require 'digest/sha1'
7 require 'net/protocol'
8
9 ##
10 # A Ruby client library for memcached.
11 #
12
13 class MemCache
14
15 ##
16 # The version of MemCache you are using.
17
18 VERSION = '1.7.4'
19
20 ##
21 # Default options for the cache object.
22
23 DEFAULT_OPTIONS = {
24 :namespace => nil,
25 :readonly => false,
26 :multithread => true,
27 :failover => true,
28 :timeout => 0.5,
29 :logger => nil,
30 :no_reply => false,
31 }
32
33 ##
34 # Default memcached port.
35
36 DEFAULT_PORT = 11211
37
38 ##
39 # Default memcached server weight.
40
41 DEFAULT_WEIGHT = 1
42
43 ##
44 # The namespace for this instance
45
46 attr_reader :namespace
47
48 ##
49 # The multithread setting for this instance
50
51 attr_reader :multithread
52
53 ##
54 # The servers this client talks to. Play at your own peril.
55
56 attr_reader :servers
57
58 ##
59 # Socket timeout limit with this client, defaults to 0.5 sec.
60 # Set to nil to disable timeouts.
61
62 attr_reader :timeout
63
64 ##
65 # Should the client try to failover to another server if the
66 # first server is down? Defaults to true.
67
68 attr_reader :failover
69
70 ##
71 # Log debug/info/warn/error to the given Logger, defaults to nil.
72
73 attr_reader :logger
74
75 ##
76 # Don't send or look for a reply from the memcached server for write operations.
77 # Please note this feature only works in memcached 1.2.5 and later. Earlier
78 # versions will reply with "ERROR".
79 attr_reader :no_reply
80
81 ##
82 # Accepts a list of +servers+ and a list of +opts+. +servers+ may be
83 # omitted. See +servers=+ for acceptable server list arguments.
84 #
85 # Valid options for +opts+ are:
86 #
87 # [:namespace] Prepends this value to all keys added or retrieved.
88 # [:readonly] Raises an exception on cache writes when true.
89 # [:multithread] Wraps cache access in a Mutex for thread safety. Defaults to true.
90 # [:failover] Should the client try to failover to another server if the
91 # first server is down? Defaults to true.
92 # [:timeout] Time to use as the socket read timeout. Defaults to 0.5 sec,
93 # set to nil to disable timeouts (this is a major performance penalty in Ruby 1.8,
94 # "gem install SystemTimer' to remove most of the penalty).
95 # [:logger] Logger to use for info/debug output, defaults to nil
96 # [:no_reply] Don't bother looking for a reply for write operations (i.e. they
97 # become 'fire and forget'), memcached 1.2.5 and later only, speeds up
98 # set/add/delete/incr/decr significantly.
99 #
100 # Other options are ignored.
101
102 def initialize(*args)
103 servers = []
104 opts = {}
105
106 case args.length
107 when 0 then # NOP
108 when 1 then
109 arg = args.shift
110 case arg
111 when Hash then opts = arg
112 when Array then servers = arg
113 when String then servers = [arg]
114 else raise ArgumentError, 'first argument must be Array, Hash or String'
115 end
116 when 2 then
117 servers, opts = args
118 else
119 raise ArgumentError, "wrong number of arguments (#{args.length} for 2)"
120 end
121
122 opts = DEFAULT_OPTIONS.merge opts
123 @namespace = opts[:namespace]
124 @readonly = opts[:readonly]
125 @multithread = opts[:multithread]
126 @timeout = opts[:timeout]
127 @failover = opts[:failover]
128 @logger = opts[:logger]
129 @no_reply = opts[:no_reply]
130 @mutex = Mutex.new if @multithread
131
132 logger.info { "memcache-client #{VERSION} #{Array(servers).inspect}" } if logger
133
134 Thread.current[:memcache_client] = self.object_id if !@multithread
135
136 self.servers = servers
137 end
138
139 ##
140 # Returns a string representation of the cache object.
141
142 def inspect
143 "<MemCache: %d servers, ns: %p, ro: %p>" %
144 [@servers.length, @namespace, @readonly]
145 end
146
147 ##
148 # Returns whether there is at least one active server for the object.
149
150 def active?
151 not @servers.empty?
152 end
153
154 ##
155 # Returns whether or not the cache object was created read only.
156
157 def readonly?
158 @readonly
159 end
160
161 ##
162 # Set the servers that the requests will be distributed between. Entries
163 # can be either strings of the form "hostname:port" or
164 # "hostname:port:weight" or MemCache::Server objects.
165 #
166 def servers=(servers)
167 # Create the server objects.
168 @servers = Array(servers).collect do |server|
169 case server
170 when String
171 host, port, weight = server.split ':', 3
172 port ||= DEFAULT_PORT
173 weight ||= DEFAULT_WEIGHT
174 Server.new self, host, port, weight
175 else
176 server
177 end
178 end
179
180 logger.debug { "Servers now: #{@servers.inspect}" } if logger
181
182 # There's no point in doing this if there's only one server
183 @continuum = create_continuum_for(@servers) if @servers.size > 1
184
185 @servers
186 end
187
188 ##
189 # Decrements the value for +key+ by +amount+ and returns the new value.
190 # +key+ must already exist. If +key+ is not an integer, it is assumed to be
191 # 0. +key+ can not be decremented below 0.
192
193 def decr(key, amount = 1)
194 raise MemCacheError, "Update of readonly cache" if @readonly
195 with_server(key) do |server, cache_key|
196 cache_decr server, cache_key, amount
197 end
198 rescue TypeError => err
199 handle_error nil, err
200 end
201
202 ##
203 # Retrieves +key+ from memcache. If +raw+ is false, the value will be
204 # unmarshalled.
205
206 def get(key, raw = false)
207 with_server(key) do |server, cache_key|
208 logger.debug { "get #{key} from #{server.inspect}" } if logger
209 value = cache_get server, cache_key
210 return nil if value.nil?
211 value = Marshal.load value unless raw
212 return value
213 end
214 rescue TypeError => err
215 handle_error nil, err
216 end
217
218 ##
219 # Performs a +get+ with the given +key+. If
220 # the value does not exist and a block was given,
221 # the block will be called and the result saved via +add+.
222 #
223 # If you do not provide a block, using this
224 # method is the same as using +get+.
225 #
226 def fetch(key, expiry = 0, raw = false)
227 value = get(key, raw)
228
229 if value.nil? && block_given?
230 value = yield
231 add(key, value, expiry, raw)
232 end
233
234 value
235 end
236
237 ##
238 # Retrieves multiple values from memcached in parallel, if possible.
239 #
240 # The memcached protocol supports the ability to retrieve multiple
241 # keys in a single request. Pass in an array of keys to this method
242 # and it will:
243 #
244 # 1. map the key to the appropriate memcached server
245 # 2. send a single request to each server that has one or more key values
246 #
247 # Returns a hash of values.
248 #
249 # cache["a"] = 1
250 # cache["b"] = 2
251 # cache.get_multi "a", "b" # => { "a" => 1, "b" => 2 }
252 #
253 # Note that get_multi assumes the values are marshalled.
254
255 def get_multi(*keys)
256 raise MemCacheError, 'No active servers' unless active?
257
258 keys.flatten!
259 key_count = keys.length
260 cache_keys = {}
261 server_keys = Hash.new { |h,k| h[k] = [] }
262
263 # map keys to servers
264 keys.each do |key|
265 server, cache_key = request_setup key
266 cache_keys[cache_key] = key
267 server_keys[server] << cache_key
268 end
269
270 results = {}
271
272 server_keys.each do |server, keys_for_server|
273 keys_for_server_str = keys_for_server.join ' '
274 begin
275 values = cache_get_multi server, keys_for_server_str
276 values.each do |key, value|
277 results[cache_keys[key]] = Marshal.load value
278 end
279 rescue IndexError => e
280 # Ignore this server and try the others
281 logger.warn { "Unable to retrieve #{keys_for_server.size} elements from #{server.inspect}: #{e.message}"} if logger
282 end
283 end
284
285 return results
286 rescue TypeError => err
287 handle_error nil, err
288 end
289
290 ##
291 # Increments the value for +key+ by +amount+ and returns the new value.
292 # +key+ must already exist. If +key+ is not an integer, it is assumed to be
293 # 0.
294
295 def incr(key, amount = 1)
296 raise MemCacheError, "Update of readonly cache" if @readonly
297 with_server(key) do |server, cache_key|
298 cache_incr server, cache_key, amount
299 end
300 rescue TypeError => err
301 handle_error nil, err
302 end
303
304 ##
305 # Add +key+ to the cache with value +value+ that expires in +expiry+
306 # seconds. If +raw+ is true, +value+ will not be Marshalled.
307 #
308 # Warning: Readers should not call this method in the event of a cache miss;
309 # see MemCache#add.
310
311 ONE_MB = 1024 * 1024
312
313 def set(key, value, expiry = 0, raw = false)
314 raise MemCacheError, "Update of readonly cache" if @readonly
315 with_server(key) do |server, cache_key|
316
317 value = Marshal.dump value unless raw
318 logger.debug { "set #{key} to #{server.inspect}: #{value.to_s.size}" } if logger
319
320 raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if value.to_s.size > ONE_MB
321
322 command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
323
324 with_socket_management(server) do |socket|
325 socket.write command
326 break nil if @no_reply
327 result = socket.gets
328 raise_on_error_response! result
329
330 if result.nil?
331 server.close
332 raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
333 end
334
335 result
336 end
337 end
338 end
339
340 ##
341 # "cas" is a check and set operation which means "store this data but
342 # only if no one else has updated since I last fetched it." This can
343 # be used as a form of optimistic locking.
344 #
345 # Works in block form like so:
346 # cache.cas('some-key') do |value|
347 # value + 1
348 # end
349 #
350 # Returns:
351 # +nil+ if the value was not found on the memcached server.
352 # +STORED+ if the value was updated successfully
353 # +EXISTS+ if the value was updated by someone else since last fetch
354
355 def cas(key, expiry=0, raw=false)
356 raise MemCacheError, "Update of readonly cache" if @readonly
357 raise MemCacheError, "A block is required" unless block_given?
358
359 (value, token) = gets(key, raw)
360 return nil unless value
361 updated = yield value
362
363 with_server(key) do |server, cache_key|
364
365 value = Marshal.dump updated unless raw
366 logger.debug { "cas #{key} to #{server.inspect}: #{value.to_s.size}" } if logger
367 command = "cas #{cache_key} 0 #{expiry} #{value.to_s.size} #{token}#{noreply}\r\n#{value}\r\n"
368
369 with_socket_management(server) do |socket|
370 socket.write command
371 break nil if @no_reply
372 result = socket.gets
373 raise_on_error_response! result
374
375 if result.nil?
376 server.close
377 raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
378 end
379
380 result
381 end
382 end
383 end
384
385 ##
386 # Add +key+ to the cache with value +value+ that expires in +expiry+
387 # seconds, but only if +key+ does not already exist in the cache.
388 # If +raw+ is true, +value+ will not be Marshalled.
389 #
390 # Readers should call this method in the event of a cache miss, not
391 # MemCache#set.
392
393 def add(key, value, expiry = 0, raw = false)
394 raise MemCacheError, "Update of readonly cache" if @readonly
395 with_server(key) do |server, cache_key|
396 value = Marshal.dump value unless raw
397 logger.debug { "add #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
398 command = "add #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
399
400 with_socket_management(server) do |socket|
401 socket.write command
402 break nil if @no_reply
403 result = socket.gets
404 raise_on_error_response! result
405 result
406 end
407 end
408 end
409
410 ##
411 # Add +key+ to the cache with value +value+ that expires in +expiry+
412 # seconds, but only if +key+ already exists in the cache.
413 # If +raw+ is true, +value+ will not be Marshalled.
414 def replace(key, value, expiry = 0, raw = false)
415 raise MemCacheError, "Update of readonly cache" if @readonly
416 with_server(key) do |server, cache_key|
417 value = Marshal.dump value unless raw
418 logger.debug { "replace #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
419 command = "replace #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
420
421 with_socket_management(server) do |socket|
422 socket.write command
423 break nil if @no_reply
424 result = socket.gets
425 raise_on_error_response! result
426 result
427 end
428 end
429 end
430
431 ##
432 # Append - 'add this data to an existing key after existing data'
433 # Please note the value is always passed to memcached as raw since it
434 # doesn't make a lot of sense to concatenate marshalled data together.
435 def append(key, value)
436 raise MemCacheError, "Update of readonly cache" if @readonly
437 with_server(key) do |server, cache_key|
438 logger.debug { "append #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
439 command = "append #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
440
441 with_socket_management(server) do |socket|
442 socket.write command
443 break nil if @no_reply
444 result = socket.gets
445 raise_on_error_response! result
446 result
447 end
448 end
449 end
450
451 ##
452 # Prepend - 'add this data to an existing key before existing data'
453 # Please note the value is always passed to memcached as raw since it
454 # doesn't make a lot of sense to concatenate marshalled data together.
455 def prepend(key, value)
456 raise MemCacheError, "Update of readonly cache" if @readonly
457 with_server(key) do |server, cache_key|
458 logger.debug { "prepend #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
459 command = "prepend #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n"
460
461 with_socket_management(server) do |socket|
462 socket.write command
463 break nil if @no_reply
464 result = socket.gets
465 raise_on_error_response! result
466 result
467 end
468 end
469 end
470
471 ##
472 # Removes +key+ from the cache in +expiry+ seconds.
473
474 def delete(key, expiry = 0)
475 raise MemCacheError, "Update of readonly cache" if @readonly
476 with_server(key) do |server, cache_key|
477 with_socket_management(server) do |socket|
478 logger.debug { "delete #{cache_key} on #{server}" } if logger
479 socket.write "delete #{cache_key} #{expiry}#{noreply}\r\n"
480 break nil if @no_reply
481 result = socket.gets
482 raise_on_error_response! result
483 result
484 end
485 end
486 end
487
488 ##
489 # Flush the cache from all memcache servers.
490 # A non-zero value for +delay+ will ensure that the flush
491 # is propogated slowly through your memcached server farm.
492 # The Nth server will be flushed N*delay seconds from now,
493 # asynchronously so this method returns quickly.
494 # This prevents a huge database spike due to a total
495 # flush all at once.
496
497 def flush_all(delay=0)
498 raise MemCacheError, 'No active servers' unless active?
499 raise MemCacheError, "Update of readonly cache" if @readonly
500
501 begin
502 delay_time = 0
503 @servers.each do |server|
504 with_socket_management(server) do |socket|
505 logger.debug { "flush_all #{delay_time} on #{server}" } if logger
506 if delay == 0 # older versions of memcached will fail silently otherwise
507 socket.write "flush_all#{noreply}\r\n"
508 else
509 socket.write "flush_all #{delay_time}#{noreply}\r\n"
510 end
511 break nil if @no_reply
512 result = socket.gets
513 raise_on_error_response! result
514 result
515 end
516 delay_time += delay
517 end
518 rescue IndexError => err
519 handle_error nil, err
520 end
521 end
522
523 ##
524 # Reset the connection to all memcache servers. This should be called if
525 # there is a problem with a cache lookup that might have left the connection
526 # in a corrupted state.
527
528 def reset
529 @servers.each { |server| server.close }
530 end
531
532 ##
533 # Returns statistics for each memcached server. An explanation of the
534 # statistics can be found in the memcached docs:
535 #
536 # http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt
537 #
538 # Example:
539 #
540 # >> pp CACHE.stats
541 # {"localhost:11211"=>
542 # {"bytes"=>4718,
543 # "pid"=>20188,
544 # "connection_structures"=>4,
545 # "time"=>1162278121,
546 # "pointer_size"=>32,
547 # "limit_maxbytes"=>67108864,
548 # "cmd_get"=>14532,
549 # "version"=>"1.2.0",
550 # "bytes_written"=>432583,
551 # "cmd_set"=>32,
552 # "get_misses"=>0,
553 # "total_connections"=>19,
554 # "curr_connections"=>3,
555 # "curr_items"=>4,
556 # "uptime"=>1557,
557 # "get_hits"=>14532,
558 # "total_items"=>32,
559 # "rusage_system"=>0.313952,
560 # "rusage_user"=>0.119981,
561 # "bytes_read"=>190619}}
562 # => nil
563
564 def stats
565 raise MemCacheError, "No active servers" unless active?
566 server_stats = {}
567
568 @servers.each do |server|
569 next unless server.alive?
570
571 with_socket_management(server) do |socket|
572 value = nil
573 socket.write "stats\r\n"
574 stats = {}
575 while line = socket.gets do
576 raise_on_error_response! line
577 break if line == "END\r\n"
578 if line =~ /\ASTAT ([\S]+) ([\w\.\:]+)/ then
579 name, value = $1, $2
580 stats[name] = case name
581 when 'version'
582 value
583 when 'rusage_user', 'rusage_system' then
584 seconds, microseconds = value.split(/:/, 2)
585 microseconds ||= 0
586 Float(seconds) + (Float(microseconds) / 1_000_000)
587 else
588 if value =~ /\A\d+\Z/ then
589 value.to_i
590 else
591 value
592 end
593 end
594 end
595 end
596 server_stats["#{server.host}:#{server.port}"] = stats
597 end
598 end
599
600 raise MemCacheError, "No active servers" if server_stats.empty?
601 server_stats
602 end
603
604 ##
605 # Shortcut to get a value from the cache.
606
607 alias [] get
608
609 ##
610 # Shortcut to save a value in the cache. This method does not set an
611 # expiration on the entry. Use set to specify an explicit expiry.
612
613 def []=(key, value)
614 set key, value
615 end
616
617 protected unless $TESTING
618
619 ##
620 # Create a key for the cache, incorporating the namespace qualifier if
621 # requested.
622
623 def make_cache_key(key)
624 if namespace.nil? then
625 key
626 else
627 "#{@namespace}:#{key}"
628 end
629 end
630
631 ##
632 # Returns an interoperable hash value for +key+. (I think, docs are
633 # sketchy for down servers).
634
635 def hash_for(key)
636 Zlib.crc32(key)
637 end
638
639 ##
640 # Pick a server to handle the request based on a hash of the key.
641
642 def get_server_for_key(key, options = {})
643 raise ArgumentError, "illegal character in key #{key.inspect}" if
644 key =~ /\s/
645 raise ArgumentError, "key too long #{key.inspect}" if key.length > 250
646 raise MemCacheError, "No servers available" if @servers.empty?
647 return @servers.first if @servers.length == 1
648
649 hkey = hash_for(key)
650
651 20.times do |try|
652 entryidx = Continuum.binary_search(@continuum, hkey)
653 server = @continuum[entryidx].server
654 return server if server.alive?
655 break unless failover
656 hkey = hash_for "#{try}#{key}"
657 end
658
659 raise MemCacheError, "No servers available"
660 end
661
662 ##
663 # Performs a raw decr for +cache_key+ from +server+. Returns nil if not
664 # found.
665
666 def cache_decr(server, cache_key, amount)
667 with_socket_management(server) do |socket|
668 socket.write "decr #{cache_key} #{amount}#{noreply}\r\n"
669 break nil if @no_reply
670 text = socket.gets
671 raise_on_error_response! text
672 return nil if text == "NOT_FOUND\r\n"
673 return text.to_i
674 end
675 end
676
677 ##
678 # Fetches the raw data for +cache_key+ from +server+. Returns nil on cache
679 # miss.
680
681 def cache_get(server, cache_key)
682 with_socket_management(server) do |socket|
683 socket.write "get #{cache_key}\r\n"
684 keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n"
685
686 if keyline.nil? then
687 server.close
688 raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
689 end
690
691 raise_on_error_response! keyline
692 return nil if keyline == "END\r\n"
693
694 unless keyline =~ /(\d+)\r/ then
695 server.close
696 raise MemCacheError, "unexpected response #{keyline.inspect}"
697 end
698 value = socket.read $1.to_i
699 socket.read 2 # "\r\n"
700 socket.gets # "END\r\n"
701 return value
702 end
703 end
704
705 def gets(key, raw = false)
706 with_server(key) do |server, cache_key|
707 logger.debug { "gets #{key} from #{server.inspect}" } if logger
708 result = with_socket_management(server) do |socket|
709 socket.write "gets #{cache_key}\r\n"
710 keyline = socket.gets # "VALUE <key> <flags> <bytes> <cas token>\r\n"
711
712 if keyline.nil? then
713 server.close
714 raise MemCacheError, "lost connection to #{server.host}:#{server.port}"
715 end
716
717 raise_on_error_response! keyline
718 return nil if keyline == "END\r\n"
719
720 unless keyline =~ /(\d+) (\w+)\r/ then
721 server.close
722 raise MemCacheError, "unexpected response #{keyline.inspect}"
723 end
724 value = socket.read $1.to_i
725 socket.read 2 # "\r\n"
726 socket.gets # "END\r\n"
727 [value, $2]
728 end
729 result[0] = Marshal.load result[0] unless raw
730 result
731 end
732 rescue TypeError => err
733 handle_error nil, err
734 end
735
736
737 ##
738 # Fetches +cache_keys+ from +server+ using a multi-get.
739
740 def cache_get_multi(server, cache_keys)
741 with_socket_management(server) do |socket|
742 values = {}
743 socket.write "get #{cache_keys}\r\n"
744
745 while keyline = socket.gets do
746 return values if keyline == "END\r\n"
747 raise_on_error_response! keyline
748
749 unless keyline =~ /\AVALUE (.+) (.+) (.+)/ then
750 server.close
751 raise MemCacheError, "unexpected response #{keyline.inspect}"
752 end
753
754 key, data_length = $1, $3
755 values[$1] = socket.read data_length.to_i
756 socket.read(2) # "\r\n"
757 end
758
759 server.close
760 raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too
761 end
762 end
763
764 ##
765 # Performs a raw incr for +cache_key+ from +server+. Returns nil if not
766 # found.
767
768 def cache_incr(server, cache_key, amount)
769 with_socket_management(server) do |socket|
770 socket.write "incr #{cache_key} #{amount}#{noreply}\r\n"
771 break nil if @no_reply
772 text = socket.gets
773 raise_on_error_response! text
774 return nil if text == "NOT_FOUND\r\n"
775 return text.to_i
776 end
777 end
778
779 ##
780 # Gets or creates a socket connected to the given server, and yields it
781 # to the block, wrapped in a mutex synchronization if @multithread is true.
782 #
783 # If a socket error (SocketError, SystemCallError, IOError) or protocol error
784 # (MemCacheError) is raised by the block, closes the socket, attempts to
785 # connect again, and retries the block (once). If an error is again raised,
786 # reraises it as MemCacheError.
787 #
788 # If unable to connect to the server (or if in the reconnect wait period),
789 # raises MemCacheError. Note that the socket connect code marks a server
790 # dead for a timeout period, so retrying does not apply to connection attempt
791 # failures (but does still apply to unexpectedly lost connections etc.).
792
793 def with_socket_management(server, &block)
794 check_multithread_status!
795
796 @mutex.lock if @multithread
797 retried = false
798
799 begin
800 socket = server.socket
801
802 # Raise an IndexError to show this server is out of whack. If were inside
803 # a with_server block, we'll catch it and attempt to restart the operation.
804
805 raise IndexError, "No connection to server (#{server.status})" if socket.nil?
806
807 block.call(socket)
808
809 rescue SocketError, Errno::EAGAIN, Timeout::Error => err
810 logger.warn { "Socket failure: #{err.message}" } if logger
811 server.mark_dead(err)
812 handle_error(server, err)
813
814 rescue MemCacheError, SystemCallError, IOError => err
815 logger.warn { "Generic failure: #{err.class.name}: #{err.message}" } if logger
816 handle_error(server, err) if retried || socket.nil?
817 retried = true
818 retry
819 end
820 ensure
821 @mutex.unlock if @multithread
822 end
823
824 def with_server(key)
825 retried = false
826 begin
827 server, cache_key = request_setup(key)
828 yield server, cache_key
829 rescue IndexError => e
830 logger.warn { "Server failed: #{e.class.name}: #{e.message}" } if logger
831 if !retried && @servers.size > 1
832 logger.info { "Connection to server #{server.inspect} DIED! Retrying operation..." } if logger
833 retried = true
834 retry
835 end
836 handle_error(nil, e)
837 end
838 end
839
840 ##
841 # Handles +error+ from +server+.
842
843 def handle_error(server, error)
844 raise error if error.is_a?(MemCacheError)
845 server.close if server
846 new_error = MemCacheError.new error.message
847 new_error.set_backtrace error.backtrace
848 raise new_error
849 end
850
851 def noreply
852 @no_reply ? ' noreply' : ''
853 end
854
855 ##
856 # Performs setup for making a request with +key+ from memcached. Returns
857 # the server to fetch the key from and the complete key to use.
858
859 def request_setup(key)
860 raise MemCacheError, 'No active servers' unless active?
861 cache_key = make_cache_key key
862 server = get_server_for_key cache_key
863 return server, cache_key
864 end
865
866 def raise_on_error_response!(response)
867 if response =~ /\A(?:CLIENT_|SERVER_)?ERROR(.*)/
868 raise MemCacheError, $1.strip
869 end
870 end
871
872 def create_continuum_for(servers)
873 total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
874 continuum = []
875
876 servers.each do |server|
877 entry_count_for(server, servers.size, total_weight).times do |idx|
878 hash = Digest::SHA1.hexdigest("#{server.host}:#{server.port}:#{idx}")
879 value = Integer("0x#{hash[0..7]}")
880 continuum << Continuum::Entry.new(value, server)
881 end
882 end
883
884 continuum.sort { |a, b| a.value <=> b.value }
885 end
886
887 def entry_count_for(server, total_servers, total_weight)
888 ((total_servers * Continuum::POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor
889 end
890
891 def check_multithread_status!
892 return if @multithread
893
894 if Thread.current[:memcache_client] != self.object_id
895 raise MemCacheError, <<-EOM
896 You are accessing this memcache-client instance from multiple threads but have not enabled multithread support.
897 Normally: MemCache.new(['localhost:11211'], :multithread => true)
898 In Rails: config.cache_store = [:mem_cache_store, 'localhost:11211', { :multithread => true }]
899 EOM
900 end
901 end
902
903 ##
904 # This class represents a memcached server instance.
905
906 class Server
907
908 ##
909 # The amount of time to wait before attempting to re-establish a
910 # connection with a server that is marked dead.
911
912 RETRY_DELAY = 30.0
913
914 ##
915 # The host the memcached server is running on.
916
917 attr_reader :host
918
919 ##
920 # The port the memcached server is listening on.
921
922 attr_reader :port
923
924 ##
925 # The weight given to the server.
926
927 attr_reader :weight
928
929 ##
930 # The time of next retry if the connection is dead.
931
932 attr_reader :retry
933
934 ##
935 # A text status string describing the state of the server.
936
937 attr_reader :status
938
939 attr_reader :logger
940
941 ##
942 # Create a new MemCache::Server object for the memcached instance
943 # listening on the given host and port, weighted by the given weight.
944
945 def initialize(memcache, host, port = DEFAULT_PORT, weight = DEFAULT_WEIGHT)
946 raise ArgumentError, "No host specified" if host.nil? or host.empty?
947 raise ArgumentError, "No port specified" if port.nil? or port.to_i.zero?
948
949 @host = host
950 @port = port.to_i
951 @weight = weight.to_i
952
953 @sock = nil
954 @retry = nil
955 @status = 'NOT CONNECTED'
956 @timeout = memcache.timeout
957 @logger = memcache.logger
958 end
959
960 ##
961 # Return a string representation of the server object.
962
963 def inspect
964 "<MemCache::Server: %s:%d [%d] (%s)>" % [@host, @port, @weight, @status]
965 end
966
967 ##
968 # Check whether the server connection is alive. This will cause the
969 # socket to attempt to connect if it isn't already connected and or if
970 # the server was previously marked as down and the retry time has
971 # been exceeded.
972
973 def alive?
974 !!socket
975 end
976
977 ##
978 # Try to connect to the memcached server targeted by this object.
979 # Returns the connected socket object on success or nil on failure.
980
981 def socket
982 return @sock if @sock and not @sock.closed?
983
984 @sock = nil
985
986 # If the host was dead, don't retry for a while.
987 return if @retry and @retry > Time.now
988
989 # Attempt to connect if not already connected.
990 begin
991 @sock = connect_to(@host, @port, @timeout)
992 @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
993 @retry = nil
994 @status = 'CONNECTED'
995 rescue SocketError, SystemCallError, IOError => err
996 logger.warn { "Unable to open socket: #{err.class.name}, #{err.message}" } if logger
997 mark_dead err
998 end
999
1000 return @sock
1001 end
1002
1003 def connect_to(host, port, timeout=nil)
1004 io = MemCache::BufferedIO.new(TCPSocket.new(host, port))
1005 io.read_timeout = timeout
1006 io
1007 end
1008
1009 ##
1010 # Close the connection to the memcached server targeted by this
1011 # object. The server is not considered dead.
1012
1013 def close
1014 @sock.close if @sock && !@sock.closed?
1015 @sock = nil
1016 @retry = nil
1017 @status = "NOT CONNECTED"
1018 end
1019
1020 ##
1021 # Mark the server as dead and close its socket.
1022
1023 def mark_dead(error)
1024 @sock.close if @sock && !@sock.closed?
1025 @sock = nil
1026 @retry = Time.now + RETRY_DELAY
1027
1028 reason = "#{error.class.name}: #{error.message}"
1029 @status = sprintf "%s:%s DEAD (%s), will retry at %s", @host, @port, reason, @retry
1030 @logger.info { @status } if @logger
1031 end
1032
1033 end
1034
1035 ##
1036 # Base MemCache exception class.
1037
1038 class MemCacheError < RuntimeError; end
1039
1040 class BufferedIO < Net::BufferedIO # :nodoc:
1041 BUFSIZE = 1024 * 16
1042
1043 # An implementation similar to this is in *trunk* for 1.9. When it
1044 # gets released, this method can be removed when using 1.9
1045 def rbuf_fill
1046 begin
1047 @rbuf << @io.read_nonblock(BUFSIZE)
1048 rescue Errno::EWOULDBLOCK
1049 retry unless @read_timeout
1050 if IO.select([@io], nil, nil, @read_timeout)
1051 retry
1052 else
1053 raise Timeout::Error, 'IO timeout'
1054 end
1055 end
1056 end
1057
1058 def setsockopt *args
1059 @io.setsockopt *args
1060 end
1061
1062 def gets
1063 readuntil("\n")
1064 end
1065 end
1066
1067 end
1068
1069 module Continuum
1070 POINTS_PER_SERVER = 160 # this is the default in libmemcached
1071
1072 # Find the closest index in Continuum with value <= the given value
1073 def self.binary_search(ary, value, &block)
1074 upper = ary.size - 1
1075 lower = 0
1076 idx = 0
1077
1078 while(lower <= upper) do
1079 idx = (lower + upper) / 2
1080 comp = ary[idx].value <=> value
1081
1082 if comp == 0
1083 return idx
1084 elsif comp > 0
1085 upper = idx - 1
1086 else
1087 lower = idx + 1
1088 end
1089 end
1090 return upper
1091 end
1092
1093 class Entry
1094 attr_reader :value
1095 attr_reader :server
1096
1097 def initialize(val, srv)
1098 @value = val
1099 @server = srv
1100 end
1101
1102 def inspect
1103 "<#{value}, #{server.host}:#{server.port}>"
1104 end
1105 end
1106
1107 end