Drizzled Public API Documentation

sql_update.cc
1 /* Copyright (C) 2000-2006 MySQL AB
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 
17 /*
18  Single table and multi table updates of tables.
19 */
20 
21 #include <config.h>
22 
23 #include <drizzled/sql_select.h>
24 #include <drizzled/error.h>
25 #include <drizzled/probes.h>
26 #include <drizzled/sql_base.h>
27 #include <drizzled/field/epoch.h>
28 #include <drizzled/sql_parse.h>
29 #include <drizzled/optimizer/range.h>
30 #include <drizzled/records.h>
31 #include <drizzled/internal/my_sys.h>
32 #include <drizzled/internal/iocache.h>
33 #include <drizzled/transaction_services.h>
34 #include <drizzled/filesort.h>
35 #include <drizzled/plugin/storage_engine.h>
36 #include <drizzled/key.h>
37 #include <drizzled/sql_lex.h>
38 #include <drizzled/diagnostics_area.h>
39 #include <drizzled/util/test.h>
40 #include <drizzled/statistics_variables.h>
41 #include <drizzled/session/transactions.h>
42 
43 #include <boost/dynamic_bitset.hpp>
44 #include <list>
45 
46 using namespace std;
47 
48 namespace drizzled {
49 
62 static void prepare_record_for_error_message(int error, Table *table)
63 {
64  Field **field_p= NULL;
65  Field *field= NULL;
66  uint32_t keynr= 0;
67 
68  /*
69  Only duplicate key errors print the key value.
70  If storage engine does always read all columns, we have the value alraedy.
71  */
72  if ((error != HA_ERR_FOUND_DUPP_KEY) ||
73  ! (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)))
74  return;
75 
76  /*
77  Get the number of the offended index.
78  We will see MAX_KEY if the engine cannot determine the affected index.
79  */
80  if ((keynr= table->get_dup_key(error)) >= MAX_KEY)
81  return;
82 
83  /* Create unique_map with all fields used by that index. */
84  boost::dynamic_bitset<> unique_map(table->getShare()->sizeFields()); /* Fields in offended unique. */
85  table->mark_columns_used_by_index_no_reset(keynr, unique_map);
86 
87  /* Subtract read_set and write_set. */
88  unique_map-= *table->read_set;
89  unique_map-= *table->write_set;
90 
91  /*
92  If the unique index uses columns that are neither in read_set
93  nor in write_set, we must re-read the record.
94  Otherwise no need to do anything.
95  */
96  if (unique_map.none())
97  return;
98 
99  /* Get identifier of last read record into table->cursor->ref. */
100  table->cursor->position(table->getInsertRecord());
101  /* Add all fields used by unique index to read_set. */
102  *table->read_set|= unique_map;
103  /* Read record that is identified by table->cursor->ref. */
104  (void) table->cursor->rnd_pos(table->getUpdateRecord(), table->cursor->ref);
105  /* Copy the newly read columns into the new record. */
106  for (field_p= table->getFields(); (field= *field_p); field_p++)
107  {
108  if (unique_map.test(field->position()))
109  {
110  field->copy_from_tmp(table->getShare()->rec_buff_length);
111  }
112  }
113 
114  return;
115 }
116 
117 
118 /*
119  Process usual UPDATE
120 
121  SYNOPSIS
122  update_query()
123  session thread handler
124  fields fields for update
125  values values of fields for update
126  conds WHERE clause expression
127  order_num number of elemen in ORDER BY clause
128  order order_st BY clause list
129  limit limit clause
130  handle_duplicates how to handle duplicates
131 
132  RETURN
133  0 - OK
134  1 - error
135 */
136 
137 int update_query(Session *session, TableList *table_list,
138  List<Item> &fields, List<Item> &values, COND *conds,
139  uint32_t order_num, Order *order,
140  ha_rows limit, enum enum_duplicates,
141  bool ignore)
142 {
143  bool using_limit= limit != HA_POS_ERROR;
144  bool used_key_is_modified;
145  bool transactional_table;
146  int error= 0;
147  uint used_index= MAX_KEY, dup_key_found;
148  bool need_sort= true;
149  ha_rows updated, found;
150  key_map old_covering_keys;
151  Table *table;
152  optimizer::SqlSelect *select= NULL;
153  ReadRecord info;
154  Select_Lex *select_lex= &session->lex().select_lex;
155  uint64_t id;
156  List<Item> all_fields;
157  Session::killed_state_t killed_status= Session::NOT_KILLED;
158 
159  DRIZZLE_UPDATE_START(session->getQueryString()->c_str());
160  if (session->openTablesLock(table_list))
161  {
162  DRIZZLE_UPDATE_DONE(1, 0, 0);
163  return 1;
164  }
165 
166  session->set_proc_info("init");
167  table= table_list->table;
168 
169  /* Calculate "table->covering_keys" based on the WHERE */
170  table->covering_keys= table->getShare()->keys_in_use;
171  table->quick_keys.reset();
172 
173  if (prepare_update(session, table_list, &conds, order_num, order))
174  {
175  DRIZZLE_UPDATE_DONE(1, 0, 0);
176  return 1;
177  }
178 
179  old_covering_keys= table->covering_keys; // Keys used in WHERE
180  /* Check the fields we are going to modify */
181  if (setup_fields_with_no_wrap(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0))
182  {
183  DRIZZLE_UPDATE_DONE(1, 0, 0);
184  return 1;
185  }
186 
187  if (table->timestamp_field)
188  {
189  // Don't set timestamp column if this is modified
190  if (table->timestamp_field->isWriteSet())
191  {
192  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
193  }
194  else
195  {
196  if (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
197  table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH)
198  {
199  table->setWriteSet(table->timestamp_field->position());
200  }
201  }
202  }
203 
204  if (setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0))
205  {
206  free_underlaid_joins(session, select_lex);
207  DRIZZLE_UPDATE_DONE(1, 0, 0);
208 
209  return 1;
210  }
211 
212  if (select_lex->inner_refs_list.size() &&
213  fix_inner_refs(session, all_fields, select_lex, select_lex->ref_pointer_array))
214  {
215  DRIZZLE_UPDATE_DONE(1, 0, 0);
216  return 1;
217  }
218 
219  if (conds)
220  {
221  Item::cond_result cond_value;
222  conds= remove_eq_conds(session, conds, &cond_value);
223  if (cond_value == Item::COND_FALSE)
224  limit= 0; // Impossible WHERE
225  }
226 
227  /*
228  If a timestamp field settable on UPDATE is present then to avoid wrong
229  update force the table handler to retrieve write-only fields to be able
230  to compare records and detect data change.
231  */
232  if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
233  table->timestamp_field &&
234  (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
235  table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
236  {
237  *table->read_set|= *table->write_set;
238  }
239  // Don't count on usage of 'only index' when calculating which key to use
240  table->covering_keys.reset();
241 
242  /* Update the table->cursor->stats.records number */
243  table->cursor->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
244 
245  select= optimizer::make_select(table, 0, 0, conds, 0, &error);
246  if (error || !limit ||
247  (select && select->check_quick(session, false, limit)))
248  {
249  delete select;
254  session->main_da().reset_diagnostics_area();
255  free_underlaid_joins(session, select_lex);
256  if (error || session->is_error())
257  {
258  DRIZZLE_UPDATE_DONE(1, 0, 0);
259  return 1;
260  }
261  DRIZZLE_UPDATE_DONE(0, 0, 0);
262  session->my_ok(); // No matching records
263  return 0;
264  }
265  if (!select && limit != HA_POS_ERROR)
266  {
267  if ((used_index= optimizer::get_index_for_order(table, order, limit)) != MAX_KEY)
268  need_sort= false;
269  }
270  /* If running in safe sql mode, don't allow updates without keys */
271  if (table->quick_keys.none())
272  {
273  session->server_status|=SERVER_QUERY_NO_INDEX_USED;
274  }
275 
276  table->mark_columns_needed_for_update();
277 
278  /* Check if we are modifying a key that we are used to search with */
279 
280  if (select && select->quick)
281  {
282  used_index= select->quick->index;
283  used_key_is_modified= (!select->quick->unique_key_range() &&
284  select->quick->is_keys_used(*table->write_set));
285  }
286  else
287  {
288  used_key_is_modified= 0;
289  if (used_index == MAX_KEY) // no index for sort order
290  used_index= table->cursor->key_used_on_scan;
291  if (used_index != MAX_KEY)
292  used_key_is_modified= is_key_used(table, used_index, *table->write_set);
293  }
294 
295 
296  if (used_key_is_modified || order)
297  {
298  /*
299  We can't update table directly; We must first search after all
300  matching rows before updating the table!
301  */
302  if (used_index < MAX_KEY && old_covering_keys.test(used_index))
303  {
304  table->key_read=1;
305  table->mark_columns_used_by_index(used_index);
306  }
307  else
308  {
309  table->use_all_columns();
310  }
311 
312  /* note: We avoid sorting avoid if we sort on the used index */
313  if (order && (need_sort || used_key_is_modified))
314  {
315  /*
316  Doing an order_st BY; Let filesort find and sort the rows we are going
317  to update
318  NOTE: filesort will call table->prepare_for_position()
319  */
320  uint32_t length= 0;
321  SortField *sortorder;
322  ha_rows examined_rows;
323  FileSort filesort(*session);
324 
325  table->sort.io_cache= new internal::io_cache_st;
326  sortorder=make_unireg_sortorder(order, &length, NULL);
327 
328  if ((table->sort.found_records= filesort.run(table, sortorder, length, select, limit, 1, examined_rows)) == HA_POS_ERROR)
329  goto err;
330  /*
331  Filesort has already found and selected the rows we want to update,
332  so we don't need the where clause
333  */
334  safe_delete(select);
335  }
336  else
337  {
338  /*
339  We are doing a search on a key that is updated. In this case
340  we go trough the matching rows, save a pointer to them and
341  update these in a separate loop based on the pointer.
342  */
343 
344  internal::io_cache_st tempfile;
345  if (tempfile.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
346  {
347  goto err;
348  }
349 
350  /* If quick select is used, initialize it before retrieving rows. */
351  if (select && select->quick && select->quick->reset())
352  goto err;
353  table->cursor->try_semi_consistent_read(1);
354 
355  /*
356  When we get here, we have one of the following options:
357  A. used_index == MAX_KEY
358  This means we should use full table scan, and start it with
359  init_read_record call
360  B. used_index != MAX_KEY
361  B.1 quick select is used, start the scan with init_read_record
362  B.2 quick select is not used, this is full index scan (with LIMIT)
363  Full index scan must be started with init_read_record_idx
364  */
365 
366  if (used_index == MAX_KEY || (select && select->quick))
367  {
368  if ((error= info.init_read_record(session, table, select, 0, true)))
369  goto err;
370  }
371  else
372  {
373  if ((error= info.init_read_record_idx(session, table, 1, used_index)))
374  goto err;
375  }
376 
377  session->set_proc_info("Searching rows for update");
378  ha_rows tmp_limit= limit;
379 
380  while (not(error= info.read_record(&info)) && not session->getKilled())
381  {
382  if (!(select && select->skip_record()))
383  {
384  if (table->cursor->was_semi_consistent_read())
385  continue; /* repeat the read of the same row if it still exists */
386 
387  table->cursor->position(table->getInsertRecord());
388  if (tempfile.write(table->cursor->ref, table->cursor->ref_length))
389  {
390  error=1;
391  break;
392  }
393  if (!--limit && using_limit)
394  {
395  error= -1;
396  break;
397  }
398  }
399  else
400  table->cursor->unlock_row();
401  }
402  if (session->getKilled() && not error)
403  error= 1; // Aborted
404  limit= tmp_limit;
405  table->cursor->try_semi_consistent_read(0);
406  info.end_read_record();
407 
408  /* Change select to use tempfile */
409  if (select)
410  {
411  safe_delete(select->quick);
412  if (select->free_cond)
413  delete select->cond;
414  select->cond=0;
415  }
416  else
417  {
418  select= new optimizer::SqlSelect();
419  select->head=table;
420  }
421  if (tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
422  error=1;
423  // Read row ptrs from this cursor
424  memcpy(select->file, &tempfile, sizeof(tempfile));
425  if (error >= 0)
426  goto err;
427  }
428  if (table->key_read)
429  table->restore_column_maps_after_mark_index();
430  }
431 
432  if (ignore)
433  table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
434 
435  if (select && select->quick && select->quick->reset())
436  goto err;
437  table->cursor->try_semi_consistent_read(1);
438  if ((error= info.init_read_record(session, table, select, 0, true)))
439  {
440  goto err;
441  }
442 
443  updated= found= 0;
444  /*
445  * Per the SQL standard, inserting NULL into a NOT NULL
446  * field requires an error to be thrown.
447  *
448  * @NOTE
449  *
450  * NULL check and handling occurs in field_conv.cc
451  */
452  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
453  session->cuted_fields= 0L;
454  session->set_proc_info("Updating");
455 
456  transactional_table= table->cursor->has_transactions();
457  session->setAbortOnWarning(test(!ignore));
458 
459  /*
460  Assure that we can use position()
461  if we need to create an error message.
462  */
463  if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ))
464  table->prepare_for_position();
465 
466  while (not (error=info.read_record(&info)) && not session->getKilled())
467  {
468  if (not (select && select->skip_record()))
469  {
470  if (table->cursor->was_semi_consistent_read())
471  continue; /* repeat the read of the same row if it still exists */
472 
473  table->storeRecord();
474  if (fill_record(session, fields, values))
475  break;
476 
477  found++;
478 
479  if (! table->records_are_comparable() || table->compare_records())
480  {
481  /* Non-batched update */
482  error= table->cursor->updateRecord(table->getUpdateRecord(),
483  table->getInsertRecord());
484 
485  table->auto_increment_field_not_null= false;
486 
487  if (!error || error == HA_ERR_RECORD_IS_THE_SAME)
488  {
489  if (error != HA_ERR_RECORD_IS_THE_SAME)
490  updated++;
491  else
492  error= 0;
493  }
494  else if (! ignore ||
495  table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
496  {
497  /*
498  If (ignore && error is ignorable) we don't have to
499  do anything; otherwise...
500  */
501  myf flags= 0;
502 
503  if (table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
504  flags|= ME_FATALERROR; /* Other handler errors are fatal */
505 
506  prepare_record_for_error_message(error, table);
507  table->print_error(error,MYF(flags));
508  error= 1;
509  break;
510  }
511  }
512 
513  if (!--limit && using_limit)
514  {
515  error= -1; // Simulate end of cursor
516  break;
517  }
518  }
519  else
520  table->cursor->unlock_row();
521  session->row_count++;
522  }
523  dup_key_found= 0;
524  /*
525  Caching the killed status to pass as the arg to query event constuctor;
526 
527  It's assumed that if an error was set in combination with an effective
528  killed status then the error is due to killing.
529  */
530  killed_status= session->getKilled(); // get the status of the volatile
531 
532  error= (killed_status == Session::NOT_KILLED)? error : 1;
533 
534  updated-= dup_key_found;
535  table->cursor->try_semi_consistent_read(0);
536 
537  if (!transactional_table && updated > 0)
538  session->transaction.stmt.markModifiedNonTransData();
539 
540  info.end_read_record();
541  delete select;
542  session->set_proc_info("end");
543  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
544 
545  /*
546  error < 0 means really no error at all: we processed all rows until the
547  last one without error. error > 0 means an error (e.g. unique key
548  violation and no IGNORE or REPLACE). error == 0 is also an error (if
549  preparing the record or invoking before triggers fails). See
550  autocommitOrRollback(error>=0) and return(error>=0) below.
551  */
552  if ((error < 0) || session->transaction.stmt.hasModifiedNonTransData())
553  {
554  if (session->transaction.stmt.hasModifiedNonTransData())
555  session->transaction.all.markModifiedNonTransData();
556  }
557  assert(transactional_table || !updated || session->transaction.stmt.hasModifiedNonTransData());
558  free_underlaid_joins(session, select_lex);
559 
560  /* If LAST_INSERT_ID(X) was used, report X */
561  id= session->arg_of_last_insert_id_function ?
563 
564  if (error < 0)
565  {
566  char buff[STRING_BUFFER_USUAL_SIZE];
567  snprintf(buff, sizeof(buff), ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
568  (ulong) session->cuted_fields);
569  session->row_count_func= updated;
574  session->main_da().reset_diagnostics_area();
575  session->my_ok((ulong) session->rowCount(), found, id, buff);
576  session->status_var.updated_row_count+= session->rowCount();
577  }
578  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL; /* calc cuted fields */
579  session->setAbortOnWarning(false);
580  DRIZZLE_UPDATE_DONE((error >= 0 || session->is_error()), found, updated);
581  return ((error >= 0 || session->is_error()) ? 1 : 0);
582 
583 err:
584  if (error != 0)
585  table->print_error(error,MYF(0));
586 
587  delete select;
588  free_underlaid_joins(session, select_lex);
589  if (table->key_read)
590  {
591  table->key_read=0;
592  table->cursor->extra(HA_EXTRA_NO_KEYREAD);
593  }
594  session->setAbortOnWarning(false);
595 
596  DRIZZLE_UPDATE_DONE(1, 0, 0);
597  return 1;
598 }
599 
600 /*
601  Prepare items in UPDATE statement
602 
603  SYNOPSIS
604  prepare_update()
605  session - thread handler
606  table_list - global/local table list
607  conds - conditions
608  order_num - number of ORDER BY list entries
609  order - ORDER BY clause list
610 
611  RETURN VALUE
612  false OK
613  true error
614 */
615 bool prepare_update(Session *session, TableList *table_list,
616  Item **conds, uint32_t order_num, Order *order)
617 {
618  List<Item> all_fields;
619  Select_Lex *select_lex= &session->lex().select_lex;
620 
621  session->lex().allow_sum_func= 0;
622 
623  if (setup_tables_and_check_access(session, &select_lex->context, &select_lex->top_join_list, table_list, &select_lex->leaf_tables, false) ||
624  session->setup_conds(table_list, conds))
625  return true;
626  select_lex->setup_ref_array(session, order_num);
627  if (setup_order(session, select_lex->ref_pointer_array, table_list, all_fields, all_fields, order))
628  return true;
629 
630  /* Check that we are not using table that we are updating in a sub select */
631  if (unique_table(table_list, table_list->next_global))
632  {
633  my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
634  return true;
635  }
636 
637  return false;
638 }
639 
640 } /* namespace drizzled */
void my_ok(ha_rows affected_rows=0, ha_rows found_rows_arg=0, uint64_t passed_id=0, const char *message=NULL)
Definition: session.cc:1877
uint32_t row_count
Definition: session.h:447
void set_proc_info(const char *info)
Definition: session.h:609
bool compare_records()
Definition: table.cc:1422
ha_rows cuted_fields
Definition: session.h:408
virtual bool is_keys_used(const boost::dynamic_bitset<> &fields)
Definition: range.cc:4125
TODO: Rename this file - func.h is stupid.
COND * remove_eq_conds(Session *session, COND *cond, Item::cond_result *cond_value)
Definition: sql_select.cc:2851
Table * table
opened table
Definition: table_list.h:145
virtual bool was_semi_consistent_read()
Definition: cursor.h:379
QuickSelectInterface * quick
Definition: range.h:277
uint64_t first_successful_insert_id_in_prev_stmt
Definition: session.h:379
ha_rows run(Table *table, SortField *sortorder, uint32_t s_length, optimizer::SqlSelect *select, ha_rows max_rows, bool sort_positions, ha_rows &examined_rows)
Definition: filesort.cc:187
int update_query(Session *session, TableList *tables, List< Item > &fields, List< Item > &values, COND *conds, uint32_t order_num, Order *order, ha_rows limit, enum enum_duplicates handle_duplicates, bool ignore)
Definition: sql_update.cc:137
static void prepare_record_for_error_message(int error, Table *table)
Definition: sql_update.cc:62
int init_read_record_idx(Session *session, Table *table, bool print_error, uint32_t idx) __attribute__((warn_unused_result))
Definition: records.cc:53
uint32_t get_dup_key(int error) const
Definition: table.h:556
int setup_order(Session *session, Item **ref_pointer_array, TableList *tables, List< Item > &fields, List< Item > &all_fields, Order *order)
Definition: sql_select.cc:5908
field::Epoch * timestamp_field
Definition: table.h:144
void free_underlaid_joins(Session *session, Select_Lex *select)
Definition: sql_select.cc:6655
Cursor * cursor
Definition: table.h:68
bool records_are_comparable()
Definition: table.cc:1403
virtual bool is_fatal_error(int error, uint32_t flags)
Definition: cursor.cc:190
virtual void try_semi_consistent_read(bool)
Definition: cursor.h:386
int64_t row_count_func
Definition: session.h:401
uint32_t ref_length
Definition: cursor.h:159
internal::io_cache_st * file
Definition: range.h:280
bool reinit_io_cache(cache_type type_arg, my_off_t seek_offset, bool use_async_io, bool clear_cache)
Reset the cache.
Definition: mf_iocache.cc:280
bool openTablesLock(TableList *)
Definition: session.cc:1767
bool is_error() const
Definition: session.cc:1871