00001
00002
00008 #ifdef HAVE_CONFIG_H
00009 #include "config.h"
00010 #endif
00011
00012 #include <sys/types.h>
00013 #include <sys/socket.h>
00014 #include <sys/time.h>
00015 #include <netinet/in.h>
00016 #include <event.h>
00017 #include <netdb.h>
00018 #include <stdbool.h>
00019 #include <stdint.h>
00020 #include <pthread.h>
00021
00022 #include "protocol_binary.h"
00023 #include "cache.h"
00024
00026 #define KEY_MAX_LENGTH 250
00027
00029 #define INCR_MAX_STORAGE_LEN 24
00030
00031 #define DATA_BUFFER_SIZE 2048
00032 #define UDP_READ_BUFFER_SIZE 65536
00033 #define UDP_MAX_PAYLOAD_SIZE 1400
00034 #define UDP_HEADER_SIZE 8
00035 #define MAX_SENDBUF_SIZE (256 * 1024 * 1024)
00036
00037
00038 #define SUFFIX_SIZE 24
00039
00041 #define ITEM_LIST_INITIAL 200
00042
00044 #define SUFFIX_LIST_INITIAL 20
00045
00047 #define IOV_LIST_INITIAL 400
00048
00050 #define MSG_LIST_INITIAL 10
00051
00053 #define READ_BUFFER_HIGHWAT 8192
00054 #define ITEM_LIST_HIGHWAT 400
00055 #define IOV_LIST_HIGHWAT 600
00056 #define MSG_LIST_HIGHWAT 100
00057
00058
00059 #define MIN_BIN_PKT_LENGTH 16
00060 #define BIN_PKT_HDR_WORDS (MIN_BIN_PKT_LENGTH/sizeof(uint32_t))
00061
00062
00063 #if HAVE_UNISTD_H
00064 # include <unistd.h>
00065 #endif
00066
00067
00068 #define POWER_SMALLEST 1
00069 #define POWER_LARGEST 200
00070 #define CHUNK_ALIGN_BYTES 8
00071 #define DONT_PREALLOC_SLABS
00072 #define MAX_NUMBER_OF_SLAB_CLASSES (POWER_LARGEST + 1)
00073
00076 #define TAIL_REPAIR_TIME (3 * 3600)
00077
00078
00079 #define ITEM_get_cas(i) ((uint64_t)(((i)->it_flags & ITEM_CAS) ? \
00080 *(uint64_t*)&((i)->end[0]) : 0x0))
00081 #define ITEM_set_cas(i,v) { if ((i)->it_flags & ITEM_CAS) { \
00082 *(uint64_t*)&((i)->end[0]) = v; } }
00083
00084 #define ITEM_key(item) (((char*)&((item)->end[0])) \
00085 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
00086
00087 #define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 \
00088 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
00089
00090 #define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 \
00091 + (item)->nsuffix \
00092 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
00093
00094 #define ITEM_ntotal(item) (sizeof(struct _stritem) + (item)->nkey + 1 \
00095 + (item)->nsuffix + (item)->nbytes \
00096 + (((item)->it_flags & ITEM_CAS) ? sizeof(uint64_t) : 0))
00097
00098 #define STAT_KEY_LEN 128
00099 #define STAT_VAL_LEN 128
00100
00102 #define APPEND_STAT(name, fmt, val) \
00103 append_stat(name, add_stats, c, fmt, val);
00104
00107 #define APPEND_NUM_FMT_STAT(name_fmt, num, name, fmt, val) \
00108 klen = snprintf(key_str, STAT_KEY_LEN, name_fmt, num, name); \
00109 vlen = snprintf(val_str, STAT_VAL_LEN, fmt, val); \
00110 add_stats(key_str, klen, val_str, vlen, c);
00111
00113 #define APPEND_NUM_STAT(num, name, fmt, val) \
00114 APPEND_NUM_FMT_STAT("%d:%s", num, name, fmt, val)
00115
00125 typedef void (*ADD_STAT)(const char *key, const uint16_t klen,
00126 const char *val, const uint32_t vlen,
00127 const void *cookie);
00128
00129
00130
00131
00135 enum conn_states {
00136 conn_listening,
00137 conn_new_cmd,
00138 conn_waiting,
00139 conn_read,
00140 conn_parse_cmd,
00141 conn_write,
00142 conn_nread,
00143 conn_swallow,
00144 conn_closing,
00145 conn_mwrite,
00146 conn_max_state
00147 };
00148
00149 enum bin_substates {
00150 bin_no_state,
00151 bin_reading_set_header,
00152 bin_reading_cas_header,
00153 bin_read_set_value,
00154 bin_reading_get_key,
00155 bin_reading_stat,
00156 bin_reading_del_header,
00157 bin_reading_incr_header,
00158 bin_read_flush_exptime
00159 };
00160
00161 enum protocol {
00162 ascii_prot = 3,
00163 binary_prot,
00164 negotiating_prot
00165 };
00166
00167 enum network_transport {
00168 local_transport,
00169 tcp_transport,
00170 udp_transport
00171 };
00172
00173 #define IS_UDP(x) (x == udp_transport)
00174
00175 #define NREAD_ADD 1
00176 #define NREAD_SET 2
00177 #define NREAD_REPLACE 3
00178 #define NREAD_APPEND 4
00179 #define NREAD_PREPEND 5
00180 #define NREAD_CAS 6
00181
00182 enum store_item_type {
00183 NOT_STORED=0, STORED, EXISTS, NOT_FOUND
00184 };
00185
00186 enum delta_result_type {
00187 OK, NON_NUMERIC, EOM
00188 };
00189
00191 typedef unsigned int rel_time_t;
00192
00194 struct slab_stats {
00195 uint64_t set_cmds;
00196 uint64_t get_hits;
00197 uint64_t delete_hits;
00198 uint64_t cas_hits;
00199 uint64_t cas_badval;
00200 uint64_t incr_hits;
00201 uint64_t decr_hits;
00202 };
00203
00207 struct thread_stats {
00208 pthread_mutex_t mutex;
00209 uint64_t get_cmds;
00210 uint64_t get_misses;
00211 uint64_t delete_misses;
00212 uint64_t incr_misses;
00213 uint64_t decr_misses;
00214 uint64_t cas_misses;
00215 uint64_t bytes_read;
00216 uint64_t bytes_written;
00217 uint64_t flush_cmds;
00218 uint64_t conn_yields;
00219 struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
00220 };
00221
00225 struct stats {
00226 pthread_mutex_t mutex;
00227 unsigned int curr_items;
00228 unsigned int total_items;
00229 uint64_t curr_bytes;
00230 unsigned int curr_conns;
00231 unsigned int total_conns;
00232 unsigned int conn_structs;
00233 uint64_t get_cmds;
00234 uint64_t set_cmds;
00235 uint64_t get_hits;
00236 uint64_t get_misses;
00237 uint64_t evictions;
00238 time_t started;
00239 bool accepting_conns;
00240 uint64_t listen_disabled_num;
00241 };
00242
00243 #define MAX_VERBOSITY_LEVEL 2
00244
00245
00249 struct settings {
00250 size_t maxbytes;
00251 int maxconns;
00252 int port;
00253 int udpport;
00254 char *inter;
00255 int verbose;
00256 rel_time_t oldest_live;
00257 int evict_to_free;
00258 char *socketpath;
00259 int access;
00260 double factor;
00261 int chunk_size;
00262 int num_threads;
00263 char prefix_delimiter;
00264 int detail_enabled;
00265 int reqs_per_event;
00266
00267 bool use_cas;
00268 enum protocol binding_protocol;
00269 int backlog;
00270 int item_size_max;
00271 };
00272
00273 extern struct stats stats;
00274 extern time_t process_started;
00275 extern struct settings settings;
00276
00277 #define ITEM_LINKED 1
00278 #define ITEM_CAS 2
00279
00280
00281 #define ITEM_SLABBED 4
00282
00286 typedef struct _stritem {
00287 struct _stritem *next;
00288 struct _stritem *prev;
00289 struct _stritem *h_next;
00290 rel_time_t time;
00291 rel_time_t exptime;
00292 int nbytes;
00293 unsigned short refcount;
00294 uint8_t nsuffix;
00295 uint8_t it_flags;
00296 uint8_t slabs_clsid;
00297 uint8_t nkey;
00298 void * end[];
00299
00300
00301
00302
00303 } item;
00304
00305 typedef struct {
00306 pthread_t thread_id;
00307 struct event_base *base;
00308 struct event notify_event;
00309 int notify_receive_fd;
00310 int notify_send_fd;
00311 struct thread_stats stats;
00312 struct conn_queue *new_conn_queue;
00313 cache_t *suffix_cache;
00314 } LIBEVENT_THREAD;
00315
00316 typedef struct {
00317 pthread_t thread_id;
00318 struct event_base *base;
00319 } LIBEVENT_DISPATCHER_THREAD;
00320
00324 typedef struct conn conn;
00325 struct conn {
00326 int sfd;
00327 enum conn_states state;
00328 enum bin_substates substate;
00329 struct event event;
00330 short ev_flags;
00331 short which;
00333 char *rbuf;
00334 char *rcurr;
00335 int rsize;
00336 int rbytes;
00338 char *wbuf;
00339 char *wcurr;
00340 int wsize;
00341 int wbytes;
00343 enum conn_states write_and_go;
00344 void *write_and_free;
00346 char *ritem;
00347 int rlbytes;
00348
00349
00350
00357 void *item;
00358
00359
00360 int sbytes;
00361
00362
00363 struct iovec *iov;
00364 int iovsize;
00365 int iovused;
00366
00367 struct msghdr *msglist;
00368 int msgsize;
00369 int msgused;
00370 int msgcurr;
00371 int msgbytes;
00372
00373 item **ilist;
00374 int isize;
00375 item **icurr;
00376 int ileft;
00377
00378 char **suffixlist;
00379 int suffixsize;
00380 char **suffixcurr;
00381 int suffixleft;
00382
00383 enum protocol protocol;
00384 enum network_transport transport;
00385
00386
00387 int request_id;
00388 struct sockaddr request_addr;
00389 socklen_t request_addr_size;
00390 unsigned char *hdrbuf;
00391 int hdrsize;
00392
00393 bool noreply;
00394
00395 struct {
00396 char *buffer;
00397 size_t size;
00398 size_t offset;
00399 } stats;
00400
00401
00402
00403 protocol_binary_request_header binary_header;
00404 uint64_t cas;
00405 short cmd;
00406 int opaque;
00407 int keylen;
00408 conn *next;
00409 LIBEVENT_THREAD *thread;
00410 };
00411
00412
00413
00414 extern volatile rel_time_t current_time;
00415
00416
00417
00418
00419 void do_accept_new_conns(const bool do_accept);
00420 enum delta_result_type do_add_delta(conn *c, item *item, const bool incr,
00421 const int64_t delta, char *buf);
00422 enum store_item_type do_store_item(item *item, int comm, conn* c);
00423 conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base);
00424 extern int daemonize(int nochdir, int noclose);
00425
00426
00427 #include "stats.h"
00428 #include "slabs.h"
00429 #include "assoc.h"
00430 #include "items.h"
00431 #include "trace.h"
00432 #include "hash.h"
00433 #include "util.h"
00434
00435
00436
00437
00438
00439
00440
00441
00442 void thread_init(int nthreads, struct event_base *main_base);
00443 int dispatch_event_add(int thread, conn *c);
00444 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport);
00445
00446
00447 enum delta_result_type add_delta(conn *c, item *item, const int incr,
00448 const int64_t delta, char *buf);
00449 void accept_new_conns(const bool do_accept);
00450 conn *conn_from_freelist(void);
00451 bool conn_add_to_freelist(conn *c);
00452 int is_listen_thread(void);
00453 item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes);
00454 char *item_cachedump(const unsigned int slabs_clsid, const unsigned int limit, unsigned int *bytes);
00455 void item_flush_expired(void);
00456 item *item_get(const char *key, const size_t nkey);
00457 int item_link(item *it);
00458 void item_remove(item *it);
00459 int item_replace(item *it, item *new_it);
00460 void item_stats(ADD_STAT add_stats, void *c);
00461 void item_stats_sizes(ADD_STAT add_stats, void *c);
00462 void item_unlink(item *it);
00463 void item_update(item *it);
00464
00465 void STATS_LOCK(void);
00466 void STATS_UNLOCK(void);
00467 void threadlocal_stats_reset(void);
00468 void threadlocal_stats_aggregate(struct thread_stats *stats);
00469 void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out);
00470
00471
00472 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
00473 const char *fmt, ...);
00474
00475 enum store_item_type store_item(item *item, int comm, conn *c);
00476
00477 #if HAVE_DROP_PRIVILEGES
00478 extern void drop_privileges(void);
00479 #else
00480 #define drop_privileges()
00481 #endif
00482
00483
00484 #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
00485 #define __builtin_expect(x, expected_value) (x)
00486 #endif
00487
00488 #define likely(x) __builtin_expect((x),1)
00489 #define unlikely(x) __builtin_expect((x),0)