// Author: Pete Kirkham // Date: 2007-10-31 // wide-finder benchmark - block io, 8-wide line break + BMH prefix match algorithm, full regex. // bitmatch changed with reference to http://graphics.stanford.edu/~seander/bithacks.html#ZeroInWord #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define BLOCK_SIZE 8096 // map for counting matches typedef std::map match_map; typedef std::pair matchling; struct order_by_second { bool operator() (const matchling& left, const matchling& right) const { return left.second > right.second; } }; typedef std::priority_queue, order_by_second> top_heap; // BMH string search // pattern must be allocated to be a multiple of 8 chars size_t prefix_search (const char* pattern, size_t pattern_length, const int* table, const char* string, size_t string_length) { if (string_length < pattern_length) return string_length; const char last_char(pattern[pattern_length - 1]); const size_t last_index(string_length - 1); const size_t pattern_last(pattern_length - 1); // scan pattern_length chars at a time for last char in pattern for (size_t index(0); index < last_index; ) { size_t scan(pattern_last); for (; string[index + scan] == pattern[scan]; --scan) if (scan == 0) return index; int jump(table[size_t(string[index + scan]) & 0xff]); size_t matched(pattern_last - scan); if ( jump <= matched ) ++index; else index += jump - matched; } return string_length; } #define character_range(a, b) if (line[index] < a || line[index] > b) return; ++index; #define character_equal(c) if (line[index] != c) return; ++index; // process a line of input inline void process_line (match_map& matches, const char* pattern, size_t pattern_length, const int* table, const char* line, size_t line_length) { #ifdef ECHO_LINES std::cout << std::string(line, line_length) << std::endl; #endif size_t match(prefix_search(pattern, pattern_length, table, line, line_length) + pattern_length); size_t index(match); if (index >= line_length - 18) return; character_range('0', '9'); character_range('0', '9'); character_range('0', '9'); character_equal('x'); character_equal('/'); character_range('0', '9'); character_range('0', '9'); character_range('0', '9'); character_range('0', '9'); character_equal('/'); character_range('0', '9'); character_range('0', '9'); character_equal('/'); character_range('0', '9'); character_range('0', '9'); character_equal('/'); while (index < line_length) { char ch(line[index]); if ((ch == '.') || (ch == '\n')) break; if (ch == ' ') { if (index > match + 18) { ++matches[std::string(line + match + 5, index - (match + 5))]; #ifdef LIST_MATCHES std::cout << std::string(line + match + 5, index - (match + 5)) << std::endl; #endif } break; } ++index; } } // process a chunk of input void process_chunk (match_map& matches, const char* pattern, size_t pattern_length, const int* table, char* buf, size_t chunk_length) { size_t line_start(0); // scan for line breaks, 8 chars at a time for (size_t i(0); i < chunk_length; i += 8) { uint64_t data(*((uint64_t*)(buf+i))); data = (data ^ 0x0a0a0a0a0a0a0a0aLL); if ((data - 0x0101010101010101LL) & ~data & 0x8080808080808080LL) { for (; i < chunk_length; ++i) { if (buf[i] == '\n') { size_t line_length(i - line_start); const char* line(buf + line_start); process_line(matches, pattern, pattern_length, table, line, line_length); line_start = i+1; i &= ~7; break; // assumes there are no lines < 8 characters } } } } if (line_start < chunk_length - pattern_length) { size_t line_length(chunk_length - line_start); const char* line(buf + line_start); process_line(matches, pattern, pattern_length, table, line, line_length); } } // find top k matches O(N log k) depending on priority_queue implementation void find_top_matches (const size_t k_desired, top_heap& top_k, const match_map& matches) { size_t least_count(0); for (match_map::const_iterator it(matches.begin()); it != matches.end(); ++it) { if (top_k.size() == k_desired) { for (++it; it != matches.end(); ++it) { if (it->second > least_count) { top_k.pop(); top_k.push(*it); least_count = top_k.top().second; } } break; } else { top_k.push(*it); } } } void* run_worker (void*); size_t ids(0); // worker class worker { public: worker (const char* pattern, size_t pattern_length, const int* table, pthread_mutex_t& available_mutex, pthread_cond_t& available_cond) : _pattern(pattern), _pattern_length(pattern_length), _table(table), _available_mutex(available_mutex), _available_cond(available_cond), _buf((char*)_mem), _state(available) { _id = ids++; pthread_mutex_init(&_mutex, NULL); pthread_cond_init (&_condition, NULL); } ~worker () { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_condition); } // not protected - assumed that only called in correct state match_map& get_matches () { return _matches; } char* get_buffer () { return _buf; } void process (size_t chunk_length) { pthread_mutex_lock(&_mutex); if (_state != available) { std::cerr<< "process: invalid state " << std::endl; exit(-2); } _chunk_length = chunk_length; _state = data_ready; pthread_cond_signal(&_condition); pthread_mutex_unlock(&_mutex); } bool is_available () { bool result; pthread_mutex_lock(&_mutex); result = (_state == available); pthread_mutex_unlock(&_mutex); return result; } void sum_matches (match_map& total) { while (!is_available()) { pthread_mutex_lock(&_available_mutex); pthread_cond_wait(&_available_cond, &_available_mutex); pthread_mutex_unlock(&_available_mutex); } pthread_mutex_lock(&_mutex); _state = complete; pthread_cond_signal(&_condition); pthread_mutex_unlock(&_mutex); pthread_join(_thread, NULL); for (match_map::const_iterator it(_matches.begin()); it != _matches.end(); ++it) { total[it->first] += it->second; } } void start_thread () { pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_create(&_thread, &attr, &run_worker, (void*)this); } void run () { for (;;) { pthread_mutex_lock(&_mutex); // if no action, wait for a signal if ((_state != data_ready) && (_state != complete)) { pthread_cond_wait(&_condition, &_mutex); } worker_state state = _state; pthread_mutex_unlock(&_mutex); switch (state) { case data_ready: pthread_mutex_lock(&_mutex); _state = processing; pthread_mutex_unlock(&_mutex); process_chunk(_matches, _pattern, _pattern_length, _table, _buf, _chunk_length); pthread_mutex_lock(&_mutex); _state = available; pthread_mutex_lock(&_available_mutex); pthread_cond_signal(&_available_cond); pthread_mutex_unlock(&_available_mutex); pthread_mutex_unlock(&_mutex); break; case complete: pthread_exit(NULL); break; default: // other states wait for condition again break; } } } private: enum worker_state { available, data_ready, processing, complete }; worker_state _state; pthread_t _thread; pthread_mutex_t _mutex; pthread_cond_t _condition; // communication when becoming available pthread_mutex_t& _available_mutex; pthread_cond_t& _available_cond; size_t _id; // internal matching state match_map _matches; char* _buf; const char* _pattern; size_t _pattern_length; size_t _chunk_length; const int* _table; uint64_t _mem[BLOCK_SIZE/4]; // use wider type to ensure alignment }; void* run_worker (void* w) { ((worker*)w)->run(); } // main int main (int narg, char** argvp) { if (narg < 2) { std::cout << "The name of the input file is required." << std::endl; return 1; } // open the file int file(open(argvp[1], O_RDONLY)); if (!file) { std::cout << "failed to open " << argvp[1] << std::endl; return 2; } // number of worker threads size_t n_workers = (narg > 2) ? atoi(argvp[2]) : 4; if (n_workers < 4) n_workers = 3; // prefix of pattern to find const char* pattern("GET /ongoing/When/"); size_t pattern_length(strlen(pattern)); // bmh table int table[256]; for (size_t scan(0); scan < 256; ++scan) table[scan] = pattern_length; for (size_t scan(0); scan < pattern_length - 1; ++scan) table[size_t(pattern[scan])&0xff] = pattern_length - 1 - scan; // block IO // for splitting lines, we move any trailing part line to the start of // memory then read data into the area following it ssize_t bytes_read; size_t bytes_carried(0); // each worker thread advertises its memory, // waits for a block to be read into the memory // when sum_matches, joins it its work thread and adds its matches to the map. typedef std::vector worker_vector; worker_vector workers; pthread_mutex_t available_mutex; pthread_cond_t available_cond; pthread_mutex_init(&available_mutex, NULL); pthread_cond_init(&available_cond, NULL); for (size_t i(0); i < n_workers; ++i) { worker* w(new worker(pattern, pattern_length, table, available_mutex, available_cond)); workers.push_back(w); w->start_thread(); } size_t next_worker_id(1); worker* current_worker(workers[0]); // total matches match_map matches; for (;;) { char* buf(current_worker->get_buffer()); // get next worker's buffer, or wait for available // need two workers as some data copied into next worker's buffer worker* next_worker(NULL); for (;;) { // get first available worker after the last one visited for (size_t i(0); i < n_workers; ++i) { worker* w(workers[(i + next_worker_id) % n_workers]); if (w->is_available() && (w != current_worker)) { next_worker = w; next_worker_id = (i + next_worker_id + 1) % n_workers; break; } } // wait for next available worker to signal if (next_worker) break; pthread_mutex_lock(&available_mutex); pthread_cond_wait(&available_cond, &available_mutex); pthread_mutex_unlock(&available_mutex); } // read a chunk into the worker's buffer if ((bytes_read = read(file, buf + bytes_carried, BLOCK_SIZE)) > 0) { size_t chunk_length(bytes_carried + bytes_read); // scan back to break on a line break for (size_t i((chunk_length - 1) & ~7); i > 0; i -= 8) { uint64_t data(*((uint64_t*)(buf+i))); data = (data ^ 0x0a0a0a0a0a0a0a0aLL); if ((data - 0x0101010101010101LL) & ~data & 0x8080808080808080LL) { i += 7; if (i > chunk_length) i = chunk_length; for (; i > 0; --i) { if (buf[i] == '\n') { ++i; if (i < chunk_length) { bytes_carried = chunk_length - i; // copy trailing data to start of next worker's buffer // assumes that a line is never more than a block long // both this thread and the worker's thread are reading the // worker's buffer; only the main thread writes to it memcpy(next_worker->get_buffer(), buf + i, bytes_carried); } else { bytes_carried = 0; } current_worker->process(i); break; } } // update the current worker current_worker = next_worker; break; } } } else { // if there is any last line, process that if (bytes_carried) { process_line(matches, pattern, pattern_length, table, buf, bytes_carried); } break; } } close(file); // sum matches from all workers and delete them for (worker_vector::const_iterator it(workers.begin()); it != workers.end(); ++it) { (*it)->sum_matches(matches); delete (*it); } pthread_mutex_destroy(&available_mutex); pthread_cond_destroy(&available_cond); const size_t k_desired(10); top_heap top_k; find_top_matches(k_desired, top_k, matches); // output them, reversing the order in the heap std::vector out_list; while (!top_k.empty()) { out_list.push_back(top_k.top()); top_k.pop(); } std::reverse(out_list.begin(), out_list.end()); for (std::vector::const_iterator it(out_list.begin()); it != out_list.end(); ++it) { std::cout << it->second << ": " << it->first << std::endl; } return 0; }