Drizzled Public API Documentation

thr_lock.cc
1 /* Copyright (C) 2000 MySQL AB
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 /*
17 Read and write locks for Posix threads. All tread must acquire
18 all locks it needs through thr_multi_lock() to avoid dead-locks.
19 A lock consists of a master lock (THR_LOCK), and lock instances
20 (THR_LOCK_DATA).
21 Any thread can have any number of lock instances (read and write:s) on
22 any lock. All lock instances must be freed.
23 Locks are prioritized according to:
24 
25 The current lock types are:
26 
27 TL_READ # Low priority read
28 TL_READ_WITH_SHARED_LOCKS
29 TL_READ_NO_INSERT # Read without concurrent inserts
30 TL_WRITE_ALLOW_WRITE # Write lock that allows other writers
31 TL_WRITE_ALLOW_READ # Write lock, but allow reading
32 TL_WRITE_CONCURRENT_INSERT
33  # Insert that can be mixed when selects
34 TL_WRITE # High priority write
35 TL_WRITE_ONLY # High priority write
36  # Abort all new lock request with an error
37 
38 Locks are prioritized according to:
39 
40 WRITE_ALLOW_WRITE, WRITE_ALLOW_READ, WRITE_CONCURRENT_INSERT, WRITE_DELAYED,
41 WRITE_LOW_PRIORITY, READ, WRITE, READ_HIGH_PRIORITY and WRITE_ONLY
42 
43 Locks in the same privilege level are scheduled in first-in-first-out order.
44 
45 To allow concurrent read/writes locks, with 'WRITE_CONCURRENT_INSERT' one
46 should put a pointer to the following functions in the lock structure:
47 (If the pointer is zero (default), the function is not called)
48 
49 
50 The lock algorithm allows one to have one TL_WRITE_ALLOW_READ,
51 TL_WRITE_CONCURRENT_INSERT lock at the same time as multiple read locks.
52 
53 */
54 
55 #include <config.h>
56 #include <drizzled/internal/my_sys.h>
57 #include <drizzled/internal/thread_var.h>
58 #include <drizzled/statistics_variables.h>
59 #include <drizzled/pthread_globals.h>
60 
61 #include <drizzled/session.h>
62 
63 #include "thr_lock.h"
64 #include <drizzled/internal/m_string.h>
65 #include <errno.h>
66 #include <list>
67 
68 #if TIME_WITH_SYS_TIME
69 # include <sys/time.h>
70 # include <time.h>
71 #else
72 # if HAVE_SYS_TIME_H
73 # include <sys/time.h>
74 # else
75 # include <time.h>
76 # endif
77 #endif
78 
79 #include <drizzled/util/test.h>
80 
81 #include <boost/interprocess/sync/lock_options.hpp>
82 
83 using namespace std;
84 
85 namespace drizzled {
86 
87 uint64_t table_lock_wait_timeout;
88 static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
89 
90 
91 uint64_t max_write_lock_count= UINT64_MAX;
92 
93 /*
94 ** For the future (now the thread specific cond is alloced by my_pthread.c)
95 */
96 
97 static inline bool
98 thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
99 {
100  return rhs == lhs;
101 }
102 
103 
104  /* Initialize a lock */
105 
106 void thr_lock_init(THR_LOCK *lock)
107 {
108  lock->read.last= &lock->read.data;
109  lock->read_wait.last= &lock->read_wait.data;
110  lock->write_wait.last= &lock->write_wait.data;
111  lock->write.last= &lock->write.data;
112 }
113 
114 
115 void THR_LOCK_INFO::init()
116 {
117  thread_id= internal::my_thread_var2()->id;
118  n_cursors= 0;
119 }
120 
121  /* Initialize a lock instance */
122 
123 void THR_LOCK_DATA::init(THR_LOCK *lock_arg, void *param_arg)
124 {
125  lock= lock_arg;
126  type= TL_UNLOCK;
127  owner= NULL; /* no owner yet */
128  status_param= param_arg;
129  cond= NULL;
130 }
131 
132 
133 static inline bool
134 have_old_read_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner)
135 {
136  for ( ; data ; data=data->next)
137  {
138  if (thr_lock_owner_equal(data->owner, owner))
139  return true; /* Already locked by thread */
140  }
141  return false;
142 }
143 
144 static void wake_up_waiters(THR_LOCK *lock);
145 
146 
147 static enum enum_thr_lock_result wait_for_lock(Session &session, struct st_lock_list *wait, THR_LOCK_DATA *data)
148 {
149  boost::condition_variable_any *cond= &session.getThreadVar()->suspend;
150  enum enum_thr_lock_result result= THR_LOCK_ABORTED;
151  bool can_deadlock= test(data->owner->info->n_cursors);
152 
153  {
154  (*wait->last)=data; /* Wait for lock */
155  data->prev= wait->last;
156  wait->last= &data->next;
157  }
158 
159  current_global_counters.locks_waited++;
160 
161  /* Set up control struct to allow others to abort locks */
162  session.getThreadVar()->current_mutex= data->lock->native_handle();
163  session.getThreadVar()->current_cond= &session.getThreadVar()->suspend;
164  data->cond= &session.getThreadVar()->suspend;;
165 
166  while (not session.getThreadVar()->abort)
167  {
168  boost::mutex::scoped_lock scoped(*data->lock->native_handle(), boost::adopt_lock_t());
169 
170  if (can_deadlock)
171  {
172  boost::xtime xt;
173  xtime_get(&xt, boost::TIME_UTC_);
174  xt.sec += table_lock_wait_timeout;
175  if (not cond->timed_wait(scoped, xt))
176  {
177  result= THR_LOCK_WAIT_TIMEOUT;
178  scoped.release();
179  break;
180  }
181  }
182  else
183  {
184  cond->wait(scoped);
185  }
186  /*
187  We must break the wait if one of the following occurs:
188  - the connection has been aborted (!session.getThreadVar()->abort), but
189  this is not a delayed insert thread (in_wait_list). For a delayed
190  insert thread the proper action at shutdown is, apparently, to
191  acquire the lock and complete the insert.
192  - the lock has been granted (data->cond is set to NULL by the granter),
193  or the waiting has been aborted (additionally data->type is set to
194  TL_UNLOCK).
195  - the wait has timed out (rc == ETIMEDOUT)
196  Order of checks below is important to not report about timeout
197  if the predicate is true.
198  */
199  if (data->cond == NULL)
200  {
201  scoped.release();
202  break;
203  }
204  scoped.release();
205  }
206  if (data->cond || data->type == TL_UNLOCK)
207  {
208  if (data->cond) /* aborted or timed out */
209  {
210  if (((*data->prev)=data->next)) /* remove from wait-list */
211  {
212  data->next->prev= data->prev;
213  }
214  else
215  {
216  wait->last=data->prev;
217  }
218  data->type= TL_UNLOCK; /* No lock */
219  wake_up_waiters(data->lock);
220  }
221  }
222  else
223  {
224  result= THR_LOCK_SUCCESS;
225  }
226  data->lock->unlock();
227 
228  /* The following must be done after unlock of lock->mutex */
229  boost::mutex::scoped_lock scopedLock(session.getThreadVar()->mutex);
230  session.getThreadVar()->current_mutex= NULL;
231  session.getThreadVar()->current_cond= NULL;
232 
233  return(result);
234 }
235 
236 
237 static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type)
238 {
239  THR_LOCK *lock= data->lock;
240  enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
241  struct st_lock_list *wait_queue;
242  THR_LOCK_DATA *lock_owner;
243 
244  data->next=0;
245  data->cond=0; /* safety */
246  data->type=lock_type;
247  data->owner= owner; /* Must be reset ! */
248  lock->lock();
249  if ((int) lock_type <= (int) TL_READ_NO_INSERT)
250  {
251  /* Request for READ lock */
252  if (lock->write.data)
253  {
254  /* We can allow a read lock even if there is already a write lock
255  on the table in one the following cases:
256  - This thread alread have a write lock on the table
257  - The write lock is TL_WRITE_ALLOW_READ or TL_WRITE_DELAYED
258  and the read lock is TL_READ_HIGH_PRIORITY or TL_READ
259  - The write lock is TL_WRITE_CONCURRENT_INSERT or TL_WRITE_ALLOW_WRITE
260  and the read lock is not TL_READ_NO_INSERT
261  */
262 
263  if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
264  (lock->write.data->type <= TL_WRITE_CONCURRENT_INSERT &&
265  (((int) lock_type <= (int) TL_READ_WITH_SHARED_LOCKS) ||
266  (lock->write.data->type != TL_WRITE_CONCURRENT_INSERT &&
267  lock->write.data->type != TL_WRITE_ALLOW_READ))))
268  { /* Already got a write lock */
269  (*lock->read.last)=data; /* Add to running FIFO */
270  data->prev=lock->read.last;
271  lock->read.last= &data->next;
272  if (lock_type == TL_READ_NO_INSERT)
273  lock->read_no_write_count++;
274  current_global_counters.locks_immediate++;
275  goto end;
276  }
277  if (lock->write.data->type == TL_WRITE_ONLY)
278  {
279  /* We are not allowed to get a READ lock in this case */
280  data->type=TL_UNLOCK;
281  result= THR_LOCK_ABORTED; /* Can't wait for this one */
282  goto end;
283  }
284  }
285  else if (!lock->write_wait.data ||
286  lock->write_wait.data->type <= TL_WRITE_DEFAULT ||
287  have_old_read_lock(lock->read.data, data->owner))
288  { /* No important write-locks */
289  (*lock->read.last)=data; /* Add to running FIFO */
290  data->prev=lock->read.last;
291  lock->read.last= &data->next;
292  if (lock_type == TL_READ_NO_INSERT)
293  lock->read_no_write_count++;
294  current_global_counters.locks_immediate++;
295  goto end;
296  }
297  /*
298  We're here if there is an active write lock or no write
299  lock but a high priority write waiting in the write_wait queue.
300  In the latter case we should yield the lock to the writer.
301  */
302  wait_queue= &lock->read_wait;
303  }
304  else /* Request for WRITE lock */
305  {
306  if (lock_type == TL_WRITE_CONCURRENT_INSERT)
307  data->type=lock_type= thr_upgraded_concurrent_insert_lock;
308 
309  if (lock->write.data) /* If there is a write lock */
310  {
311  if (lock->write.data->type == TL_WRITE_ONLY)
312  {
313  /* Allow lock owner to bypass TL_WRITE_ONLY. */
314  if (!thr_lock_owner_equal(data->owner, lock->write.data->owner))
315  {
316  /* We are not allowed to get a lock in this case */
317  data->type=TL_UNLOCK;
318  result= THR_LOCK_ABORTED; /* Can't wait for this one */
319  goto end;
320  }
321  }
322 
323  /*
324  The following test will not work if the old lock was a
325  TL_WRITE_ALLOW_WRITE, TL_WRITE_ALLOW_READ or TL_WRITE_DELAYED in
326  the same thread, but this will never happen within MySQL.
327  */
328  if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
329  (lock_type == TL_WRITE_ALLOW_WRITE &&
330  !lock->write_wait.data &&
331  lock->write.data->type == TL_WRITE_ALLOW_WRITE))
332  {
333  /*
334  We have already got a write lock or all locks are
335  TL_WRITE_ALLOW_WRITE
336  */
337 
338  (*lock->write.last)=data; /* Add to running fifo */
339  data->prev=lock->write.last;
340  lock->write.last= &data->next;
341  current_global_counters.locks_immediate++;
342  goto end;
343  }
344  }
345  else
346  {
347  if (!lock->write_wait.data)
348  { /* no scheduled write locks */
349  if (!lock->read.data ||
350  (lock_type <= TL_WRITE_CONCURRENT_INSERT &&
351  ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
352  lock_type != TL_WRITE_ALLOW_WRITE) ||
353  !lock->read_no_write_count)))
354  {
355  (*lock->write.last)=data; /* Add as current write lock */
356  data->prev=lock->write.last;
357  lock->write.last= &data->next;
358  current_global_counters.locks_immediate++;
359  goto end;
360  }
361  }
362  }
363  wait_queue= &lock->write_wait;
364  }
365  /*
366  Try to detect a trivial deadlock when using cursors: attempt to
367  lock a table that is already locked by an open cursor within the
368  same connection. lock_owner can be zero if we succumbed to a high
369  priority writer in the write_wait queue.
370  */
371  lock_owner= lock->read.data ? lock->read.data : lock->write.data;
372  if (lock_owner && lock_owner->owner->info == owner->info)
373  {
374  result= THR_LOCK_DEADLOCK;
375  goto end;
376  }
377 
378  /* Can't get lock yet; Wait for it */
379  return(wait_for_lock(session, wait_queue, data));
380 
381 end:
382  lock->unlock();
383 
384  return(result);
385 }
386 
387 
388 static void free_all_read_locks(THR_LOCK *lock, bool using_concurrent_insert)
389 {
390  THR_LOCK_DATA *data= lock->read_wait.data;
391 
392  /* move all locks from read_wait list to read list */
393  (*lock->read.last)=data;
394  data->prev=lock->read.last;
395  lock->read.last=lock->read_wait.last;
396 
397  /* Clear read_wait list */
398  lock->read_wait.last= &lock->read_wait.data;
399 
400  do
401  {
402  boost::condition_variable_any *cond= data->cond;
403  if ((int) data->type == (int) TL_READ_NO_INSERT)
404  {
405  if (using_concurrent_insert)
406  {
407  /*
408  We can't free this lock;
409  Link lock away from read chain back into read_wait chain
410  */
411  if (((*data->prev)=data->next))
412  data->next->prev=data->prev;
413  else
414  lock->read.last=data->prev;
415  *lock->read_wait.last= data;
416  data->prev= lock->read_wait.last;
417  lock->read_wait.last= &data->next;
418  continue;
419  }
420  lock->read_no_write_count++;
421  }
422  data->cond= NULL; /* Mark thread free */
423  cond->notify_one();
424  } while ((data=data->next));
425  *lock->read_wait.last=0;
426  if (!lock->read_wait.data)
427  lock->write_lock_count=0;
428 }
429 
430 /* Unlock lock and free next thread on same lock */
431 
432 static void thr_unlock(THR_LOCK_DATA *data)
433 {
434  THR_LOCK *lock=data->lock;
435  enum thr_lock_type lock_type=data->type;
436  lock->lock();
437 
438  if (((*data->prev)=data->next)) /* remove from lock-list */
439  data->next->prev= data->prev;
440  else if (lock_type <= TL_READ_NO_INSERT)
441  lock->read.last=data->prev;
442  else
443  lock->write.last=data->prev;
444  if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
445  { }
446  else
447  { }
448  if (lock_type == TL_READ_NO_INSERT)
449  lock->read_no_write_count--;
450  data->type=TL_UNLOCK; /* Mark unlocked */
451  wake_up_waiters(lock);
452  lock->unlock();
453 }
454 
455 
464 static void wake_up_waiters(THR_LOCK *lock)
465 {
466  THR_LOCK_DATA *data;
467  enum thr_lock_type lock_type;
468 
469  if (!lock->write.data) /* If no active write locks */
470  {
471  data=lock->write_wait.data;
472  if (!lock->read.data) /* If no more locks in use */
473  {
474  /* Release write-locks with TL_WRITE or TL_WRITE_ONLY priority first */
475  if (data &&
476  (!lock->read_wait.data || lock->read_wait.data->type <= TL_READ_WITH_SHARED_LOCKS))
477  {
478  if (lock->write_lock_count++ > max_write_lock_count)
479  {
480  /* Too many write locks in a row; Release all waiting read locks */
481  lock->write_lock_count=0;
482  if (lock->read_wait.data)
483  {
484  free_all_read_locks(lock,0);
485  goto end;
486  }
487  }
488  for (;;)
489  {
490  if (((*data->prev)=data->next)) /* remove from wait-list */
491  data->next->prev= data->prev;
492  else
493  lock->write_wait.last=data->prev;
494  (*lock->write.last)=data; /* Put in execute list */
495  data->prev=lock->write.last;
496  data->next=0;
497  lock->write.last= &data->next;
498 
499  {
500  boost::condition_variable_any *cond= data->cond;
501  data->cond= NULL; /* Mark thread free */
502  cond->notify_one(); /* Start waiting thred */
503  }
504  if (data->type != TL_WRITE_ALLOW_WRITE ||
505  !lock->write_wait.data ||
506  lock->write_wait.data->type != TL_WRITE_ALLOW_WRITE)
507  break;
508  data=lock->write_wait.data; /* Free this too */
509  }
510  if (data->type >= TL_WRITE)
511  goto end;
512  /* Release possible read locks together with the write lock */
513  }
514  if (lock->read_wait.data)
515  free_all_read_locks(lock,
516  data &&
517  (data->type == TL_WRITE_CONCURRENT_INSERT ||
518  data->type == TL_WRITE_ALLOW_WRITE));
519  }
520  else if (data &&
521  (lock_type=data->type) <= TL_WRITE_CONCURRENT_INSERT &&
522  ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
523  lock_type != TL_WRITE_ALLOW_WRITE) ||
524  !lock->read_no_write_count))
525  {
526  do {
527  boost::condition_variable_any *cond= data->cond;
528  if (((*data->prev)=data->next)) /* remove from wait-list */
529  data->next->prev= data->prev;
530  else
531  lock->write_wait.last=data->prev;
532  (*lock->write.last)=data; /* Put in execute list */
533  data->prev=lock->write.last;
534  lock->write.last= &data->next;
535  data->next=0; /* Only one write lock */
536  data->cond= NULL; /* Mark thread free */
537  cond->notify_one(); /* Start waiting thread */
538  } while (lock_type == TL_WRITE_ALLOW_WRITE &&
539  (data=lock->write_wait.data) &&
540  data->type == TL_WRITE_ALLOW_WRITE);
541  if (lock->read_wait.data)
542  free_all_read_locks(lock,
543  (lock_type == TL_WRITE_CONCURRENT_INSERT ||
544  lock_type == TL_WRITE_ALLOW_WRITE));
545  }
546  else if (!data && lock->read_wait.data)
547  {
548  free_all_read_locks(lock,0);
549  }
550  }
551 end:
552  return;
553 }
554 
555 
556 /*
557 ** Get all locks in a specific order to avoid dead-locks
558 ** Sort acording to lock position and put write_locks before read_locks if
559 ** lock on same lock.
560 */
561 
562 
563 #define LOCK_CMP(A,B) ((unsigned char*) (A->lock) - (uint32_t) ((A)->type) < (unsigned char*) (B->lock)- (uint32_t) ((B)->type))
564 
565 static void sort_locks(THR_LOCK_DATA **data,uint32_t count)
566 {
567  THR_LOCK_DATA **pos,**end,**prev,*tmp;
568 
569  /* Sort locks with insertion sort (fast because almost always few locks) */
570 
571  for (pos=data+1,end=data+count; pos < end ; pos++)
572  {
573  tmp= *pos;
574  if (LOCK_CMP(tmp,pos[-1]))
575  {
576  prev=pos;
577  do {
578  prev[0]=prev[-1];
579  } while (--prev != data && LOCK_CMP(tmp,prev[-1]));
580  prev[0]=tmp;
581  }
582  }
583 }
584 
585 
586 enum enum_thr_lock_result
587 thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
588 {
589  THR_LOCK_DATA **pos,**end;
590  if (count > 1)
591  sort_locks(data,count);
592  /* lock everything */
593  for (pos=data,end=data+count; pos < end ; pos++)
594  {
595  enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
596  if (result != THR_LOCK_SUCCESS)
597  { /* Aborted */
598  thr_multi_unlock(data,(uint32_t) (pos-data));
599  return(result);
600  }
601  }
602  return(THR_LOCK_SUCCESS);
603 }
604 
605  /* free all locks */
606 
607 void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count)
608 {
609  THR_LOCK_DATA **pos,**end;
610 
611  for (pos=data,end=data+count; pos < end ; pos++)
612  {
613  if ((*pos)->type != TL_UNLOCK)
614  thr_unlock(*pos);
615  }
616 }
617 
618 void DrizzleLock::unlock(uint32_t count)
619 {
620  THR_LOCK_DATA **pos,**end;
621 
622  for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
623  {
624  if ((*pos)->type != TL_UNLOCK)
625  thr_unlock(*pos);
626  }
627 }
628 
629 /*
630  Abort all threads waiting for a lock. The lock will be upgraded to
631  TL_WRITE_ONLY to abort any new accesses to the lock
632 */
633 
634 void THR_LOCK::abort_locks()
635 {
636  boost::mutex::scoped_lock scopedLock(mutex);
637 
638  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
639  {
640  local_data->type= TL_UNLOCK; /* Mark killed */
641  /* It's safe to signal the cond first: we're still holding the mutex. */
642  local_data->cond->notify_one();
643  local_data->cond= NULL; /* Removed from list */
644  }
645  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
646  {
647  local_data->type= TL_UNLOCK;
648  local_data->cond->notify_one();
649  local_data->cond= NULL;
650  }
651  read_wait.last= &read_wait.data;
652  write_wait.last= &write_wait.data;
653  read_wait.data= write_wait.data=0;
654  if (write.data)
655  write.data->type=TL_WRITE_ONLY;
656 }
657 
658 
659 /*
660  Abort all locks for specific table/thread combination
661 
662  This is used to abort all locks for a specific thread
663 */
664 
665 bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
666 {
667  bool found= false;
668 
669  boost::mutex::scoped_lock scopedLock(mutex);
670  for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
671  {
672  if (local_data->owner->info->thread_id == thread_id_arg)
673  {
674  local_data->type= TL_UNLOCK; /* Mark killed */
675  /* It's safe to signal the cond first: we're still holding the mutex. */
676  found= true;
677  local_data->cond->notify_one();
678  local_data->cond= 0; /* Removed from list */
679 
680  if (((*local_data->prev)= local_data->next))
681  local_data->next->prev= local_data->prev;
682  else
683  read_wait.last= local_data->prev;
684  }
685  }
686  for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
687  {
688  if (local_data->owner->info->thread_id == thread_id_arg)
689  {
690  local_data->type= TL_UNLOCK;
691  found= true;
692  local_data->cond->notify_one();
693  local_data->cond= NULL;
694 
695  if (((*local_data->prev)= local_data->next))
696  local_data->next->prev= local_data->prev;
697  else
698  write_wait.last= local_data->prev;
699  }
700  }
701  wake_up_waiters(this);
702 
703  return found;
704 }
705 
706 } /* namespace drizzled */
static void wake_up_waiters(THR_LOCK *lock)
Wake up all threads which pending requests for the lock can be satisfied.
Definition: thr_lock.cc:464
TODO: Rename this file - func.h is stupid.