binlog.cc   binlog.cc 
skipping to change at line 163 skipping to change at line 163
if (unlikely(setup_thread_globals(m_original_thd))) if (unlikely(setup_thread_globals(m_original_thd)))
DBUG_ASSERT(0); // Out of memory?! DBUG_ASSERT(0); // Out of memory?!
#endif #endif
} }
/** /**
Attach the POSIX thread to a session. Attach the POSIX thread to a session.
*/ */
int attach_to(THD *thd) int attach_to(THD *thd)
{ {
/*
Simulate session attach error.
*/
DBUG_EXECUTE_IF("simulate_session_attach_error",
{
if (rand() % 3 == 0)
return 1;
};);
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
if (PSI_server) if (PSI_server)
PSI_server->set_thread(thd_get_psi(thd)); PSI_server->set_thread(thd_get_psi(thd));
#endif #endif
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
if (unlikely(setup_thread_globals(thd))) if (unlikely(setup_thread_globals(thd)))
{ {
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
if (PSI_server) if (PSI_server)
PSI_server->set_thread(m_saved_psi); PSI_server->set_thread(m_saved_psi);
skipping to change at line 1703 skipping to change at line 1711
if (linfo->index_file_offset < purge_offset) if (linfo->index_file_offset < purge_offset)
linfo->fatal = (linfo->index_file_offset != 0); linfo->fatal = (linfo->index_file_offset != 0);
else else
linfo->index_file_offset -= purge_offset; linfo->index_file_offset -= purge_offset;
mysql_mutex_unlock(&linfo->lock); mysql_mutex_unlock(&linfo->lock);
} }
} }
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
} }
static bool log_in_use(const char* log_name) static int log_in_use(const char* log_name)
{ {
size_t log_name_len = strlen(log_name) + 1; size_t log_name_len = strlen(log_name) + 1;
bool result = 0; int thread_count=0;
mysql_mutex_lock(&LOCK_thread_count); mysql_mutex_lock(&LOCK_thread_count);
Thread_iterator it= global_thread_list_begin(); Thread_iterator it= global_thread_list_begin();
Thread_iterator end= global_thread_list_end(); Thread_iterator end= global_thread_list_end();
for (; it != end; ++it) for (; it != end; ++it)
{ {
LOG_INFO* linfo; LOG_INFO* linfo;
if ((linfo = (*it)->current_linfo)) if ((linfo = (*it)->current_linfo))
{ {
mysql_mutex_lock(&linfo->lock); mysql_mutex_lock(&linfo->lock);
result = !memcmp(log_name, linfo->log_file_name, log_name_len); if(!memcmp(log_name, linfo->log_file_name, log_name_len))
{
thread_count++;
sql_print_warning("file %s was not purged because it was being read
"
"by thread number %llu", log_name,
(ulonglong)(*it)->thread_id);
}
mysql_mutex_unlock(&linfo->lock); mysql_mutex_unlock(&linfo->lock);
if (result)
break;
} }
} }
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
return result; return thread_count;
} }
static bool purge_error_message(THD* thd, int res) static bool purge_error_message(THD* thd, int res)
{ {
uint errcode; uint errcode;
if ((errcode= purge_log_get_error_code(res)) != 0) if ((errcode= purge_log_get_error_code(res)) != 0)
{ {
my_message(errcode, ER(errcode), MYF(0)); my_message(errcode, ER(errcode), MYF(0));
return TRUE; return TRUE;
skipping to change at line 1920 skipping to change at line 1932
{ {
my_ok(thd); my_ok(thd);
return FALSE; return FALSE;
} }
mysql_bin_log.make_log_name(search_file_name, to_log); mysql_bin_log.make_log_name(search_file_name, to_log);
return purge_error_message(thd, return purge_error_message(thd,
mysql_bin_log.purge_logs(search_file_name, fal se, mysql_bin_log.purge_logs(search_file_name, fal se,
true/*need_lock_index =true*/, true/*need_lock_index =true*/,
true/*need_update_thr eads=true*/, true/*need_update_thr eads=true*/,
NULL)); NULL, false));
} }
/** /**
Execute a PURGE BINARY LOGS BEFORE <date> command. Execute a PURGE BINARY LOGS BEFORE <date> command.
@param thd Pointer to THD object for the client thread executing the @param thd Pointer to THD object for the client thread executing the
statement. statement.
@param purge_time Date before which logs should be purged. @param purge_time Date before which logs should be purged.
skipping to change at line 1942 skipping to change at line 1954
@retval TRUE failure @retval TRUE failure
*/ */
bool purge_master_logs_before_date(THD* thd, time_t purge_time) bool purge_master_logs_before_date(THD* thd, time_t purge_time)
{ {
if (!mysql_bin_log.is_open()) if (!mysql_bin_log.is_open())
{ {
my_ok(thd); my_ok(thd);
return 0; return 0;
} }
return purge_error_message(thd, return purge_error_message(thd,
mysql_bin_log.purge_logs_before_date(purge_tim mysql_bin_log.purge_logs_before_date(purge_tim
e)); e,
false));
} }
#endif /* EMBEDDED_LIBRARY */ #endif /* EMBEDDED_LIBRARY */
/* /*
Helper function to get the error code of the query to be binlogged. Helper function to get the error code of the query to be binlogged.
*/ */
int query_error_code(THD *thd, bool not_killed) int query_error_code(THD *thd, bool not_killed)
{ {
int error; int error;
skipping to change at line 2132 skipping to change at line 2145
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
if ((file=open_binlog_file(&log, linfo.log_file_name, &errmsg)) < 0) if ((file=open_binlog_file(&log, linfo.log_file_name, &errmsg)) < 0)
goto err; goto err;
/* /*
to account binlog event header size to account binlog event header size
*/ */
thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER; thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
DEBUG_SYNC(thd, "after_show_binlog_event_found_file");
mysql_mutex_lock(log_lock); mysql_mutex_lock(log_lock);
/* /*
open_binlog_file() sought to position 4. open_binlog_file() sought to position 4.
Read the first event in case it's a Format_description_log_event, to Read the first event in case it's a Format_description_log_event, to
know the format. If there's no such event, we are 3.23 or 4.x. This know the format. If there's no such event, we are 3.23 or 4.x. This
code, like before, can't read 3.23 binlogs. code, like before, can't read 3.23 binlogs.
This code will fail on a mixed relay log (one which has Format_desc t hen This code will fail on a mixed relay log (one which has Format_desc t hen
Rotate then Format_desc). Rotate then Format_desc).
*/ */
skipping to change at line 2290 skipping to change at line 2305
{ {
DBUG_ENTER("cleanup"); DBUG_ENTER("cleanup");
if (inited) if (inited)
{ {
inited= 0; inited= 0;
close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT); close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT);
mysql_mutex_destroy(&LOCK_log); mysql_mutex_destroy(&LOCK_log);
mysql_mutex_destroy(&LOCK_index); mysql_mutex_destroy(&LOCK_index);
mysql_mutex_destroy(&LOCK_commit); mysql_mutex_destroy(&LOCK_commit);
mysql_mutex_destroy(&LOCK_sync); mysql_mutex_destroy(&LOCK_sync);
mysql_mutex_destroy(&LOCK_xids);
mysql_cond_destroy(&update_cond); mysql_cond_destroy(&update_cond);
my_atomic_rwlock_destroy(&m_prep_xids_lock); my_atomic_rwlock_destroy(&m_prep_xids_lock);
mysql_cond_destroy(&m_prep_xids_cond); mysql_cond_destroy(&m_prep_xids_cond);
stage_manager.deinit(); stage_manager.deinit();
} }
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
void MYSQL_BIN_LOG::init_pthread_objects() void MYSQL_BIN_LOG::init_pthread_objects()
{ {
MYSQL_LOG::init_pthread_objects(); MYSQL_LOG::init_pthread_objects();
mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW); mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW);
mysql_mutex_init(m_key_LOCK_commit, &LOCK_commit, MY_MUTEX_INIT_FAST); mysql_mutex_init(m_key_LOCK_commit, &LOCK_commit, MY_MUTEX_INIT_FAST);
mysql_mutex_init(m_key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST); mysql_mutex_init(m_key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(m_key_LOCK_xids, &LOCK_xids, MY_MUTEX_INIT_FAST);
mysql_cond_init(m_key_update_cond, &update_cond, 0); mysql_cond_init(m_key_update_cond, &update_cond, 0);
my_atomic_rwlock_init(&m_prep_xids_lock); my_atomic_rwlock_init(&m_prep_xids_lock);
mysql_cond_init(m_key_prep_xids_cond, &m_prep_xids_cond, NULL); mysql_cond_init(m_key_prep_xids_cond, &m_prep_xids_cond, NULL);
stage_manager.init( stage_manager.init(
#ifdef HAVE_PSI_INTERFACE #ifdef HAVE_PSI_INTERFACE
m_key_LOCK_flush_queue, m_key_LOCK_flush_queue,
m_key_LOCK_sync_queue, m_key_LOCK_sync_queue,
m_key_LOCK_commit_queue, m_key_LOCK_commit_queue,
m_key_LOCK_done, m_key_COND_done m_key_LOCK_done, m_key_COND_done
#endif #endif
skipping to change at line 2506 skipping to change at line 2523
#ifndef DBUG_OFF #ifndef DBUG_OFF
char* prev_buffer= prev_gtids_ev->get_str(NULL, NULL); char* prev_buffer= prev_gtids_ev->get_str(NULL, NULL);
DBUG_PRINT("info", ("Got Previous_gtids from file '%s': Gtid_set='%s' .", DBUG_PRINT("info", ("Got Previous_gtids from file '%s': Gtid_set='%s' .",
filename, prev_buffer)); filename, prev_buffer));
my_free(prev_buffer); my_free(prev_buffer);
#endif #endif
break; break;
} }
case GTID_LOG_EVENT: case GTID_LOG_EVENT:
{ {
DBUG_EXECUTE_IF("inject_fault_bug16502579", {
DBUG_PRINT("debug", ("GTID_LOG_EVENT found. Injected
ret=NO_GTIDS."));
ret=NO_GTIDS;
});
if (ret != GOT_GTIDS) if (ret != GOT_GTIDS)
{ {
if (ret != GOT_PREVIOUS_GTIDS) if (ret != GOT_PREVIOUS_GTIDS)
// should not happen {
my_error(ER_MASTER_FATAL_ERROR_READING_BINLOG, MYF(0)); /*
Since this routine is run on startup, there may not be a
THD instance. Therefore, ER(X) cannot be used.
*/
const char* msg_fmt= (current_thd != NULL) ?
ER(ER_BINLOG_LOGICAL_CORRUPTION) :
ER_DEFAULT(ER_BINLOG_LOGICAL_CORRUPTION);
my_printf_error(ER_BINLOG_LOGICAL_CORRUPTION,
msg_fmt, MYF(0),
filename,
"The first global transaction identifier was read
, but "
"no other information regarding identifiers exist
ing "
"on the previous log files was found.");
ret= ERROR, done= true;
break;
}
else else
ret= GOT_GTIDS; ret= GOT_GTIDS;
} }
/* /*
When all_gtids==NULL, we just check if the binary log contains When all_gtids==NULL, we just check if the binary log contains
at least one Gtid_log_event, so that we can distinguish the at least one Gtid_log_event, so that we can distinguish the
return values GOT_GTID and GOT_PREVIOUS_GTIDS. We don't need return values GOT_GTID and GOT_PREVIOUS_GTIDS. We don't need
to read anything else from the binary log. to read anything else from the binary log.
*/ */
if (all_gtids == NULL) if (all_gtids == NULL)
skipping to change at line 3329 skipping to change at line 3365
if (!log_name || if (!log_name ||
(log_name_len == fname_len-1 && full_fname[log_name_len] == '\n' && (log_name_len == fname_len-1 && full_fname[log_name_len] == '\n' &&
!memcmp(full_fname, full_log_name, log_name_len))) !memcmp(full_fname, full_log_name, log_name_len)))
{ {
DBUG_PRINT("info", ("Found log file entry")); DBUG_PRINT("info", ("Found log file entry"));
full_fname[fname_len-1]= 0; // remove last \n full_fname[fname_len-1]= 0; // remove last \n
linfo->index_file_start_offset= offset; linfo->index_file_start_offset= offset;
linfo->index_file_offset = my_b_tell(&index_file); linfo->index_file_offset = my_b_tell(&index_file);
break; break;
} }
linfo->entry_index++;
} }
end: end:
if (need_lock_index) if (need_lock_index)
mysql_mutex_unlock(&LOCK_index); mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/** /**
Find the position in the log-index-file for the given log name. Find the position in the log-index-file for the given log name.
skipping to change at line 3422 skipping to change at line 3459
1 error 1 error
*/ */
bool MYSQL_BIN_LOG::reset_logs(THD* thd) bool MYSQL_BIN_LOG::reset_logs(THD* thd)
{ {
LOG_INFO linfo; LOG_INFO linfo;
bool error=0; bool error=0;
int err; int err;
const char* save_name; const char* save_name;
DBUG_ENTER("reset_logs"); DBUG_ENTER("reset_logs");
/*
Flush logs for storage engines, so that the last transaction
is fsynced inside storage engines.
*/
if (ha_flush_logs(NULL))
DBUG_RETURN(1);
ha_reset_logs(thd); ha_reset_logs(thd);
/* /*
The following mutex is needed to ensure that no threads call The following mutex is needed to ensure that no threads call
'delete thd' as we would then risk missing a 'rollback' from this 'delete thd' as we would then risk missing a 'rollback' from this
thread. If the transaction involved MyISAM tables, it should go thread. If the transaction involved MyISAM tables, it should go
into binlog even on rollback. into binlog even on rollback.
*/ */
mysql_mutex_lock(&LOCK_thread_count); mysql_mutex_lock(&LOCK_thread_count);
skipping to change at line 3746 skipping to change at line 3790
/* Store where we are in the new file for the execution thread */ /* Store where we are in the new file for the execution thread */
rli->flush_info(TRUE); rli->flush_info(TRUE);
DBUG_EXECUTE_IF("crash_before_purge_logs", DBUG_SUICIDE();); DBUG_EXECUTE_IF("crash_before_purge_logs", DBUG_SUICIDE(););
mysql_mutex_lock(&rli->log_space_lock); mysql_mutex_lock(&rli->log_space_lock);
rli->relay_log.purge_logs(to_purge_if_included, included, rli->relay_log.purge_logs(to_purge_if_included, included,
false/*need_lock_index=false*/, false/*need_lock_index=false*/,
false/*need_update_threads=false*/, false/*need_update_threads=false*/,
&rli->log_space_total); &rli->log_space_total, true);
// Tell the I/O thread to take the relay_log_space_limit into account // Tell the I/O thread to take the relay_log_space_limit into account
rli->ignore_log_space_limit= 0; rli->ignore_log_space_limit= 0;
mysql_mutex_unlock(&rli->log_space_lock); mysql_mutex_unlock(&rli->log_space_lock);
/* /*
Ok to broadcast after the critical region as there is no risk of Ok to broadcast after the critical region as there is no risk of
the mutex being destroyed by this thread later - this helps save the mutex being destroyed by this thread later - this helps save
context switches context switches
*/ */
mysql_cond_broadcast(&rli->log_space_cond); mysql_cond_broadcast(&rli->log_space_cond);
skipping to change at line 3855 skipping to change at line 3899
/** /**
Remove all logs before the given log from disk and from the index file. Remove all logs before the given log from disk and from the index file.
@param to_log Delete all log file name before this file. @param to_log Delete all log file name before this file.
@param included If true, to_log is deleted too. @param included If true, to_log is deleted too.
@param need_lock_index @param need_lock_index
@param need_update_threads If we want to update the log coordinates of @param need_update_threads If we want to update the log coordinates of
all threads. False for relay logs, true otherw ise. all threads. False for relay logs, true otherw ise.
@param freed_log_space If not null, decrement this variable of @param freed_log_space If not null, decrement this variable of
the amount of log space freed the amount of log space freed
@param auto_purge True if this is an automatic purge.
@note @note
If any of the logs before the deleted one is in use, If any of the logs before the deleted one is in use,
only purge logs up to this one. only purge logs up to this one.
@retval @retval
0 ok 0 ok
@retval @retval
LOG_INFO_EOF to_log not found LOG_INFO_EOF to_log not found
LOG_INFO_EMFILE too many files opened LOG_INFO_EMFILE too many files opened
LOG_INFO_FATAL if any other than ENOENT error from LOG_INFO_FATAL if any other than ENOENT error from
mysql_file_stat() or mysql_file_delete() mysql_file_stat() or mysql_file_delete()
*/ */
int MYSQL_BIN_LOG::purge_logs(const char *to_log, int MYSQL_BIN_LOG::purge_logs(const char *to_log,
bool included, bool included,
bool need_lock_index, bool need_lock_index,
bool need_update_threads, bool need_update_threads,
ulonglong *decrease_log_space) ulonglong *decrease_log_space,
bool auto_purge)
{ {
int error= 0; int error= 0, no_of_log_files_to_purge= 0, no_of_log_files_purged= 0;
int no_of_threads_locking_log= 0;
bool exit_loop= 0; bool exit_loop= 0;
LOG_INFO log_info; LOG_INFO log_info;
THD *thd= current_thd; THD *thd= current_thd;
DBUG_ENTER("purge_logs"); DBUG_ENTER("purge_logs");
DBUG_PRINT("info",("to_log= %s",to_log)); DBUG_PRINT("info",("to_log= %s",to_log));
if (need_lock_index) if (need_lock_index)
mysql_mutex_lock(&LOCK_index); mysql_mutex_lock(&LOCK_index);
else else
mysql_mutex_assert_owner(&LOCK_index); mysql_mutex_assert_owner(&LOCK_index);
if ((error=find_log_pos(&log_info, to_log, false/*need_lock_index=false*/ ))) if ((error=find_log_pos(&log_info, to_log, false/*need_lock_index=false*/ )))
{ {
sql_print_error("MYSQL_BIN_LOG::purge_logs was called with file %s not " sql_print_error("MYSQL_BIN_LOG::purge_logs was called with file %s not "
"listed in the index.", to_log); "listed in the index.", to_log);
goto err; goto err;
} }
no_of_log_files_to_purge= log_info.entry_index;
if ((error= open_purge_index_file(TRUE))) if ((error= open_purge_index_file(TRUE)))
{ {
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to sync the index fil e."); sql_print_error("MYSQL_BIN_LOG::purge_logs failed to sync the index fil e.");
goto err; goto err;
} }
/* /*
File name exists in index file; delete until we find this file File name exists in index file; delete until we find this file
or a file that is used. or a file that is used.
*/ */
if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/) )) if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/) ))
goto err; goto err;
while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)) &&
!is_active(log_info.log_file_name) && while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)))
!log_in_use(log_info.log_file_name))
{ {
if(is_active(log_info.log_file_name))
{
if(!auto_purge)
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARN_PURGE_LOG_IS_ACTIVE,
ER(ER_WARN_PURGE_LOG_IS_ACTIVE),
log_info.log_file_name);
break;
}
if ((no_of_threads_locking_log= log_in_use(log_info.log_file_name)))
{
if(!auto_purge)
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARN_PURGE_LOG_IN_USE,
ER(ER_WARN_PURGE_LOG_IN_USE),
log_info.log_file_name, no_of_threads_locking_
log,
no_of_log_files_purged, no_of_log_files_to_purg
e);
break;
}
no_of_log_files_purged++;
if ((error= register_purge_index_entry(log_info.log_file_name))) if ((error= register_purge_index_entry(log_info.log_file_name)))
{ {
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to copy %s to regis ter file.", sql_print_error("MYSQL_BIN_LOG::purge_logs failed to copy %s to regis ter file.",
log_info.log_file_name); log_info.log_file_name);
goto err; goto err;
} }
if (find_next_log(&log_info, false/*need_lock_index=false*/) || exit_lo op) if (find_next_log(&log_info, false/*need_lock_index=false*/) || exit_lo op)
break; break;
} }
skipping to change at line 3939 skipping to change at line 4009
if ((error=remove_logs_from_index(&log_info, need_update_threads))) if ((error=remove_logs_from_index(&log_info, need_update_threads)))
{ {
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to update the index f ile"); sql_print_error("MYSQL_BIN_LOG::purge_logs failed to update the index f ile");
goto err; goto err;
} }
// Update gtid_state->lost_gtids // Update gtid_state->lost_gtids
if (gtid_mode > 0 && !is_relay_log) if (gtid_mode > 0 && !is_relay_log)
{ {
global_sid_lock->wrlock(); global_sid_lock->wrlock();
if (init_gtid_sets(NULL, error= init_gtid_sets(NULL,
const_cast<Gtid_set *>(gtid_state->get_lost_gtids()) , const_cast<Gtid_set *>(gtid_state->get_lost_gtids()) ,
opt_master_verify_checksum, opt_master_verify_checksum,
false/*false=don't need lock*/)) false/*false=don't need lock*/);
goto err;
global_sid_lock->unlock(); global_sid_lock->unlock();
if (error)
goto err;
} }
DBUG_EXECUTE_IF("crash_purge_critical_after_update_index", DBUG_SUICIDE() ;); DBUG_EXECUTE_IF("crash_purge_critical_after_update_index", DBUG_SUICIDE() ;);
err: err:
int error_index= 0, close_error_index= 0;
/* Read each entry from purge_index_file and delete the file. */ /* Read each entry from purge_index_file and delete the file. */
if (is_inited_purge_index_file() && if (is_inited_purge_index_file() &&
(error= purge_index_entry(thd, decrease_log_space, false/*need_lock_i ndex=false*/))) (error_index= purge_index_entry(thd, decrease_log_space, false/*need_ lock_index=false*/)))
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to process registered files" sql_print_error("MYSQL_BIN_LOG::purge_logs failed to process registered files"
" that would be purged."); " that would be purged.");
close_purge_index_file();
close_error_index= close_purge_index_file();
DBUG_EXECUTE_IF("crash_purge_non_critical_after_update_index", DBUG_SUICI DE();); DBUG_EXECUTE_IF("crash_purge_non_critical_after_update_index", DBUG_SUICI DE(););
if (need_lock_index) if (need_lock_index)
mysql_mutex_unlock(&LOCK_index); mysql_mutex_unlock(&LOCK_index);
/*
Error codes from purge logs take precedence.
Then error codes from purging the index entry.
Finally, error codes from closing the purge index file.
*/
error= error ? error : (error_index ? error_index :
close_error_index);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
int MYSQL_BIN_LOG::set_purge_index_file_name(const char *base_file_name) int MYSQL_BIN_LOG::set_purge_index_file_name(const char *base_file_name)
{ {
int error= 0; int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::set_purge_index_file_name"); DBUG_ENTER("MYSQL_BIN_LOG::set_purge_index_file_name");
if (fn_format(purge_index_file_name, base_file_name, mysql_data_home, if (fn_format(purge_index_file_name, base_file_name, mysql_data_home,
".~rec~", MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH | ".~rec~", MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH |
MY_REPLACE_EXT)) == NULL) MY_REPLACE_EXT)) == NULL)
skipping to change at line 4245 skipping to change at line 4328
err: err:
DBUG_RETURN(error); DBUG_RETURN(error);
} }
/** /**
Remove all logs before the given file date from disk and from the Remove all logs before the given file date from disk and from the
index file. index file.
@param thd Thread pointer @param thd Thread pointer
@param purge_time Delete all log files before given date. @param purge_time Delete all log files before given date.
@param auto_purge True if this is an automatic purge.
@note @note
If any of the logs before the deleted one is in use, If any of the logs before the deleted one is in use,
only purge logs up to this one. only purge logs up to this one.
@retval @retval
0 ok 0 ok
@retval @retval
LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated
LOG_INFO_FATAL if any other than ENOENT error from LOG_INFO_FATAL if any other than ENOENT error from
mysql_file_stat() or mysql_file_delete() mysql_file_stat() or mysql_file_delete()
*/ */
int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time) int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time, bool auto_purg e)
{ {
int error; int error;
char to_log[FN_REFLEN]; int no_of_threads_locking_log= 0, no_of_log_files_purged= 0;
bool log_is_active= false, log_is_in_use= false;
char to_log[FN_REFLEN], copy_log_in_use[FN_REFLEN];
LOG_INFO log_info; LOG_INFO log_info;
MY_STAT stat_area; MY_STAT stat_area;
THD *thd= current_thd; THD *thd= current_thd;
DBUG_ENTER("purge_logs_before_date"); DBUG_ENTER("purge_logs_before_date");
mysql_mutex_lock(&LOCK_index); mysql_mutex_lock(&LOCK_index);
to_log[0]= 0; to_log[0]= 0;
if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/) )) if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/) ))
goto err; goto err;
while (strcmp(log_file_name, log_info.log_file_name) && while (!(log_is_active= is_active(log_info.log_file_name)))
!is_active(log_info.log_file_name) &&
!log_in_use(log_info.log_file_name))
{ {
if ((no_of_threads_locking_log= log_in_use(log_info.log_file_name)))
{
if (!auto_purge)
{
log_is_in_use= true;
strcpy(copy_log_in_use, log_info.log_file_name);
}
break;
}
no_of_log_files_purged++;
if (!mysql_file_stat(m_key_file_log, if (!mysql_file_stat(m_key_file_log,
log_info.log_file_name, &stat_area, MYF(0))) log_info.log_file_name, &stat_area, MYF(0)))
{ {
if (my_errno == ENOENT) if (my_errno == ENOENT)
{ {
/* /*
It's not fatal if we can't stat a log file that does not exist. It's not fatal if we can't stat a log file that does not exist.
*/ */
my_errno= 0; my_errno= 0;
} }
skipping to change at line 4325 skipping to change at line 4420
strmake(to_log, strmake(to_log,
log_info.log_file_name, log_info.log_file_name,
sizeof(log_info.log_file_name) - 1); sizeof(log_info.log_file_name) - 1);
else else
break; break;
} }
if (find_next_log(&log_info, false/*need_lock_index=false*/)) if (find_next_log(&log_info, false/*need_lock_index=false*/))
break; break;
} }
if (log_is_active)
{
if(!auto_purge)
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARN_PURGE_LOG_IS_ACTIVE,
ER(ER_WARN_PURGE_LOG_IS_ACTIVE),
log_info.log_file_name);
}
if (log_is_in_use)
{
int no_of_log_files_to_purge= no_of_log_files_purged+1;
while (strcmp(log_file_name, log_info.log_file_name))
{
if (mysql_file_stat(m_key_file_log, log_info.log_file_name,
&stat_area, MYF(0)))
{
if (stat_area.st_mtime < purge_time)
no_of_log_files_to_purge++;
else
break;
}
if (find_next_log(&log_info, false/*need_lock_index=false*/))
{
no_of_log_files_to_purge++;
break;
}
}
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARN_PURGE_LOG_IN_USE,
ER(ER_WARN_PURGE_LOG_IN_USE),
copy_log_in_use, no_of_threads_locking_log,
no_of_log_files_purged, no_of_log_files_to_purge);
}
error= (to_log[0] ? purge_logs(to_log, true, error= (to_log[0] ? purge_logs(to_log, true,
false/*need_lock_index=false*/, false/*need_lock_index=false*/,
true/*need_update_threads=true*/, true/*need_update_threads=true*/,
(ulonglong *) 0) : 0); (ulonglong *) 0, auto_purge) : 0);
err: err:
mysql_mutex_unlock(&LOCK_index); mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
/** /**
Create a new log file name. Create a new log file name.
skipping to change at line 4416 skipping to change at line 4548
if (!is_open()) if (!is_open())
{ {
DBUG_PRINT("info",("log is closed")); DBUG_PRINT("info",("log is closed"));
DBUG_RETURN(error); DBUG_RETURN(error);
} }
if (need_lock_log) if (need_lock_log)
mysql_mutex_lock(&LOCK_log); mysql_mutex_lock(&LOCK_log);
else else
mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_lock(&LOCK_commit); DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",
DEBUG_SYNC(current_thd, "before_rotate_binlog"););
mysql_mutex_lock(&LOCK_xids);
/* /*
We need to ensure that the number of prepared XIDs are 0. We need to ensure that the number of prepared XIDs are 0.
If m_prep_xids is not zero: If m_prep_xids is not zero:
- We release the LOCK_commit lock to allow sessions to commit, - We wait for storage engine commit, hence decrease m_prep_xids
hence decrease m_prep_xids
- We keep the LOCK_log to block new transactions from being - We keep the LOCK_log to block new transactions from being
written to the binary log. written to the binary log.
*/ */
while (get_prep_xids() > 0) while (get_prep_xids() > 0)
mysql_cond_wait(&m_prep_xids_cond, &LOCK_commit); mysql_cond_wait(&m_prep_xids_cond, &LOCK_xids);
mysql_mutex_unlock(&LOCK_xids);
mysql_mutex_lock(&LOCK_index); mysql_mutex_lock(&LOCK_index);
if ((error= ha_flush_logs(0))) if (DBUG_EVALUATE_IF("expire_logs_always", 0, 1)
&& (error= ha_flush_logs(NULL)))
goto end; goto end;
mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_commit);
mysql_mutex_assert_owner(&LOCK_index); mysql_mutex_assert_owner(&LOCK_index);
/* Reuse old name if not binlog and not update log */ /* Reuse old name if not binlog and not update log */
new_name_ptr= name; new_name_ptr= name;
/* /*
If user hasn't specified an extension, generate a new log name If user hasn't specified an extension, generate a new log name
We have to do this here and not in open as we want to store the We have to do this here and not in open as we want to store the
new file name in the current binary log file. new file name in the current binary log file.
*/ */
skipping to change at line 4557 skipping to change at line 4692
close(LOG_CLOSE_INDEX); close(LOG_CLOSE_INDEX);
sql_print_error("Could not open %s for logging (error %d). " sql_print_error("Could not open %s for logging (error %d). "
"Turning logging off for the whole duration " "Turning logging off for the whole duration "
"of the MySQL server process. To turn it on " "of the MySQL server process. To turn it on "
"again: fix the cause, shutdown the MySQL " "again: fix the cause, shutdown the MySQL "
"server and restart it.", "server and restart it.",
new_name_ptr, errno); new_name_ptr, errno);
} }
mysql_mutex_unlock(&LOCK_index); mysql_mutex_unlock(&LOCK_index);
mysql_mutex_unlock(&LOCK_commit);
if (need_lock_log) if (need_lock_log)
mysql_mutex_unlock(&LOCK_log); mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
/** /**
Called after an event has been written to the relay log by the IO Called after an event has been written to the relay log by the IO
thread. This flushes and possibly syncs the file (according to the thread. This flushes and possibly syncs the file (according to the
skipping to change at line 4981 skipping to change at line 5115
@retval @retval
nonzero - error in rotating routine. nonzero - error in rotating routine.
*/ */
void MYSQL_BIN_LOG::purge() void MYSQL_BIN_LOG::purge()
{ {
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (expire_logs_days) if (expire_logs_days)
{ {
DEBUG_SYNC(current_thd, "at_purge_logs_before_date"); DEBUG_SYNC(current_thd, "at_purge_logs_before_date");
time_t purge_time= my_time(0) - expire_logs_days*24*60*60; time_t purge_time= my_time(0) - expire_logs_days*24*60*60;
DBUG_EXECUTE_IF("expire_logs_always",
{ purge_time= my_time(0);});
if (purge_time >= 0) if (purge_time >= 0)
{ {
purge_logs_before_date(purge_time); /*
Flush logs for storage engines, so that the last transaction
is fsynced inside storage engines.
*/
ha_flush_logs(NULL);
purge_logs_before_date(purge_time, true);
} }
} }
#endif #endif
} }
/** /**
The method is a shortcut of @c rotate() and @c purge(). The method is a shortcut of @c rotate() and @c purge().
LOCK_log is acquired prior to rotate and is released after it. LOCK_log is acquired prior to rotate and is released after it.
@param force_rotate caller can request the log rotation @param force_rotate caller can request the log rotation
skipping to change at line 5406 skipping to change at line 5547
'cache' needs to be reinitialized after this functions returns. 'cache' needs to be reinitialized after this functions returns.
*/ */
bool MYSQL_BIN_LOG::write_cache(THD *thd, binlog_cache_data *cache_data) bool MYSQL_BIN_LOG::write_cache(THD *thd, binlog_cache_data *cache_data)
{ {
DBUG_ENTER("MYSQL_BIN_LOG::write_cache(THD *, binlog_cache_data *, bool)" ); DBUG_ENTER("MYSQL_BIN_LOG::write_cache(THD *, binlog_cache_data *, bool)" );
IO_CACHE *cache= &cache_data->cache_log; IO_CACHE *cache= &cache_data->cache_log;
bool incident= cache_data->has_incident(); bool incident= cache_data->has_incident();
DBUG_EXECUTE_IF("simulate_binlog_flush_error",
{
if (rand() % 3 == 0)
{
write_error=1;
goto err;
}
};);
mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_log);
DBUG_ASSERT(is_open()); DBUG_ASSERT(is_open());
if (likely(is_open())) // Should always be true if (likely(is_open())) // Should always be true
{ {
/* /*
We only bother to write to the binary log if there is anything We only bother to write to the binary log if there is anything
to write. to write.
*/ */
if (my_b_tell(cache) > 0) if (my_b_tell(cache) > 0)
skipping to change at line 5466 skipping to change at line 5616
DBUG_RETURN(0); DBUG_RETURN(0);
err: err:
if (!write_error) if (!write_error)
{ {
char errbuf[MYSYS_STRERROR_SIZE]; char errbuf[MYSYS_STRERROR_SIZE];
write_error= 1; write_error= 1;
sql_print_error(ER(ER_ERROR_ON_WRITE), name, sql_print_error(ER(ER_ERROR_ON_WRITE), name,
errno, my_strerror(errbuf, sizeof(errbuf), errno)); errno, my_strerror(errbuf, sizeof(errbuf), errno));
} }
thd->commit_error= THD::CE_FLUSH_ERROR;
DBUG_RETURN(1); DBUG_RETURN(1);
} }
/** /**
Wait until we get a signal that the relay log has been updated. Wait until we get a signal that the relay log has been updated.
@param[in] thd Thread variable @param[in] thd Thread variable
@param[in] timeout a pointer to a timespec; @param[in] timeout a pointer to a timespec;
NULL means to wait w/o timeout. NULL means to wait w/o timeout.
skipping to change at line 6016 skipping to change at line 6168
bool wrote_xid= false; bool wrote_xid= false;
int error= cache_mngr->flush(thd, &bytes, &wrote_xid); int error= cache_mngr->flush(thd, &bytes, &wrote_xid);
if (!error && bytes > 0) if (!error && bytes > 0)
{ {
/* /*
Note that set_trans_pos does not copy the file name. See Note that set_trans_pos does not copy the file name. See
this function documentation for more info. this function documentation for more info.
*/ */
thd->set_trans_pos(log_file_name, my_b_tell(&log_file)); thd->set_trans_pos(log_file_name, my_b_tell(&log_file));
if (wrote_xid) if (wrote_xid)
{ inc_prep_xids(thd);
inc_prep_xids();
thd->transaction.flags.xid_written= true;
}
} }
DBUG_PRINT("debug", ("bytes: %llu", bytes)); DBUG_PRINT("debug", ("bytes: %llu", bytes));
return std::make_pair(error, bytes); return std::make_pair(error, bytes);
} }
/** /**
Execute the flush stage. Execute the flush stage.
@param total_bytes_var Pointer to variable that will be set to total @param total_bytes_var Pointer to variable that will be set to total
number of bytes flushed, or NULL. number of bytes flushed, or NULL.
skipping to change at line 6045 skipping to change at line 6194
@return Error code on error, zero on success @return Error code on error, zero on success
*/ */
int int
MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var, MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
bool *rotate_var, bool *rotate_var,
THD **out_queue_var) THD **out_queue_var)
{ {
DBUG_ASSERT(total_bytes_var && rotate_var && out_queue_var); DBUG_ASSERT(total_bytes_var && rotate_var && out_queue_var);
my_off_t total_bytes= 0; my_off_t total_bytes= 0;
int flush_error= 0; int flush_error= 1;
mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_log);
my_atomic_rwlock_rdlock(&opt_binlog_max_flush_queue_time_lock); my_atomic_rwlock_rdlock(&opt_binlog_max_flush_queue_time_lock);
const ulonglong max_udelay= my_atomic_load32(&opt_binlog_max_flush_queue_ time); const ulonglong max_udelay= my_atomic_load32(&opt_binlog_max_flush_queue_ time);
my_atomic_rwlock_rdunlock(&opt_binlog_max_flush_queue_time_lock); my_atomic_rwlock_rdunlock(&opt_binlog_max_flush_queue_time_lock);
const ulonglong start_utime= max_udelay > 0 ? my_micro_time() : 0; const ulonglong start_utime= max_udelay > 0 ? my_micro_time() : 0;
/* /*
First we read the queue until it either is empty or the difference First we read the queue until it either is empty or the difference
between the time we started and the current time is too large. between the time we started and the current time is too large.
skipping to change at line 6068 skipping to change at line 6217
beginning of the out queue. beginning of the out queue.
*/ */
bool has_more= true; bool has_more= true;
THD *first_seen= NULL; THD *first_seen= NULL;
while ((max_udelay == 0 || my_micro_time() < start_utime + max_udelay) && has_more) while ((max_udelay == 0 || my_micro_time() < start_utime + max_udelay) && has_more)
{ {
std::pair<bool,THD*> current= stage_manager.pop_front(Stage_manager::FL USH_STAGE); std::pair<bool,THD*> current= stage_manager.pop_front(Stage_manager::FL USH_STAGE);
std::pair<int,my_off_t> result= flush_thread_caches(current.second); std::pair<int,my_off_t> result= flush_thread_caches(current.second);
has_more= current.first; has_more= current.first;
total_bytes+= result.second; total_bytes+= result.second;
if (flush_error == 0) if (flush_error == 1)
flush_error= result.first; flush_error= result.first;
if (first_seen == NULL) if (first_seen == NULL)
first_seen= current.second; first_seen= current.second;
} }
/* /*
Either the queue is empty, or we ran out of time. If we ran out of Either the queue is empty, or we ran out of time. If we ran out of
time, we have to fetch the entire queue (and flush it) since time, we have to fetch the entire queue (and flush it) since
otherwise the next batch will not have a leader. otherwise the next batch will not have a leader.
*/ */
if (has_more) if (has_more)
{ {
THD *queue= stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE); THD *queue= stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE);
for (THD *head= queue ; head ; head = head->next_to_commit) for (THD *head= queue ; head ; head = head->next_to_commit)
{ {
std::pair<int,my_off_t> result= flush_thread_caches(head); std::pair<int,my_off_t> result= flush_thread_caches(head);
total_bytes+= result.second; total_bytes+= result.second;
if (flush_error == 0) if (flush_error == 1)
flush_error= result.first; flush_error= result.first;
} }
if (first_seen == NULL) if (first_seen == NULL)
first_seen= queue; first_seen= queue;
} }
*out_queue_var= first_seen; *out_queue_var= first_seen;
*total_bytes_var= total_bytes; *total_bytes_var= total_bytes;
if (total_bytes > 0 && my_b_tell(&log_file) >= (my_off_t) max_size) if (total_bytes > 0 && my_b_tell(&log_file) >= (my_off_t) max_size)
*rotate_var= true; *rotate_var= true;
skipping to change at line 6112 skipping to change at line 6261
This function commit an entire queue of sessions starting with the This function commit an entire queue of sessions starting with the
session in @c first. If there were an error in the flushing part of session in @c first. If there were an error in the flushing part of
the ordered commit, the error code is passed in and all the threads the ordered commit, the error code is passed in and all the threads
are marked accordingly (but not committed). are marked accordingly (but not committed).
@see MYSQL_BIN_LOG::ordered_commit @see MYSQL_BIN_LOG::ordered_commit
@param thd The "master" thread @param thd The "master" thread
@param first First thread in the queue of threads to commit @param first First thread in the queue of threads to commit
@param flush_error Error code from flush operation.
*/ */
void void
MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first, MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first)
int flush_error)
{ {
mysql_mutex_assert_owner(&LOCK_commit); mysql_mutex_assert_owner(&LOCK_commit);
Thread_excursion excursion(thd); Thread_excursion excursion(thd);
#ifndef DBUG_OFF #ifndef DBUG_OFF
thd->transaction.flags.ready_preempt= 1; // formality by the leader thd->transaction.flags.ready_preempt= 1; // formality by the leader
#endif #endif
for (THD *head= first ; head ; head = head->next_to_commit) for (THD *head= first ; head ; head = head->next_to_commit)
{ {
DBUG_PRINT("debug", ("Thread ID: %lu, commit_error: %d, flags.pending: %s", DBUG_PRINT("debug", ("Thread ID: %lu, commit_error: %d, flags.pending: %s",
head->thread_id, head->commit_error, head->thread_id, head->commit_error,
skipping to change at line 6140 skipping to change at line 6287
If flushing failed, set commit_error for the session, skip the If flushing failed, set commit_error for the session, skip the
transaction and proceed with the next transaction instead. This transaction and proceed with the next transaction instead. This
will mark all threads as failed, since the flush failed. will mark all threads as failed, since the flush failed.
If flush succeeded, attach to the session and commit it in the If flush succeeded, attach to the session and commit it in the
engines. engines.
*/ */
#ifndef DBUG_OFF #ifndef DBUG_OFF
stage_manager.clear_preempt_status(head); stage_manager.clear_preempt_status(head);
#endif #endif
if (flush_error != 0) if (head->commit_error != THD::CE_NONE)
head->commit_error= flush_error; ;
else if (int error= excursion.attach_to(head)) else if (excursion.attach_to(head))
head->commit_error= error; {
head->commit_error= THD::CE_COMMIT_ERROR;
sql_print_error("Out of memory while attaching to session thread "
"during the group commit phase.");
}
else else
{ {
bool all= head->transaction.flags.real_commit; bool all= head->transaction.flags.real_commit;
if (head->transaction.flags.commit_low) if (head->transaction.flags.commit_low)
{ {
/* head is parked to have exited append() */ /* head is parked to have exited append() */
DBUG_ASSERT(head->transaction.flags.ready_preempt); DBUG_ASSERT(head->transaction.flags.ready_preempt);
/*
if (int error= ha_commit_low(head, all)) storage engine commit
head->commit_error= error; */
else if (head->transaction.flags.xid_written) if (ha_commit_low(head, all, false))
dec_prep_xids(); head->commit_error= THD::CE_COMMIT_ERROR;
} }
DBUG_PRINT("debug", ("commit_error: %d, flags.pending: %s", DBUG_PRINT("debug", ("commit_error: %d, flags.pending: %s",
head->commit_error, head->commit_error,
YESNO(head->transaction.flags.pending))); YESNO(head->transaction.flags.pending)));
} }
/*
Decrement the prepared XID counter after storage engine commit.
We also need decrement the prepared XID when encountering a
flush error or session attach error for avoiding 3-way deadlock
among user thread, rotate thread and dump thread.
*/
if (head->transaction.flags.xid_written)
dec_prep_xids(head);
}
}
/**
Process after commit for a sequence of sessions.
@param thd The "master" thread
@param first First thread in the queue of threads to commit
*/
void
MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first)
{
Thread_excursion excursion(thd);
for (THD *head= first; head; head= head->next_to_commit)
{
if (head->transaction.flags.run_hooks &&
head->commit_error == THD::CE_NONE)
{
if (excursion.attach_to(head))
{
head->commit_error= THD::CE_COMMIT_ERROR;
sql_print_error("Out of memory while attaching to session thread "
"during the group commit phase.");
}
if (head->commit_error == THD::CE_NONE)
{
bool all= head->transaction.flags.real_commit;
(void) RUN_HOOK(transaction, after_commit, (head, all));
/*
When after_commit finished for the transaction, clear the run_hoo
ks flag.
This allow other parts of the system to check if after_commit was
called.
*/
head->transaction.flags.run_hooks= false;
}
}
} }
} }
#ifndef DBUG_OFF #ifndef DBUG_OFF
/** Names for the stages. */ /** Names for the stages. */
static const char* g_stage_name[] = { static const char* g_stage_name[] = {
"FLUSH", "FLUSH",
"SYNC", "SYNC",
"COMMIT", "COMMIT",
}; };
skipping to change at line 6284 skipping to change at line 6479
if (enter_stage(thd, Thread_queue::FLUSH_STAGE, thd, &LOCK_log)) if (enter_stage(thd, Thread_queue::FLUSH_STAGE, thd, &LOCK_log))
return finish_commit(thd); return finish_commit(thd);
@endcode @endcode
@return Error code if the session commit failed, or zero on @return Error code if the session commit failed, or zero on
success. success.
*/ */
int int
MYSQL_BIN_LOG::finish_commit(THD *thd) MYSQL_BIN_LOG::finish_commit(THD *thd)
{ {
if (thd->commit_error == 0 && thd->transaction.flags.commit_low) if (thd->transaction.flags.commit_low)
{ {
const bool all= thd->transaction.flags.real_commit; const bool all= thd->transaction.flags.real_commit;
thd->commit_error= ha_commit_low(thd, all); /*
storage engine commit
*/
if (thd->commit_error == THD::CE_NONE &&
ha_commit_low(thd, all, false))
thd->commit_error= THD::CE_COMMIT_ERROR;
/*
Decrement the prepared XID counter after storage engine commit
*/
if (thd->transaction.flags.xid_written) if (thd->transaction.flags.xid_written)
dec_prep_xids(); dec_prep_xids(thd);
/*
If commit succeeded, we call the after_commit hook
*/
if (thd->commit_error == THD::CE_NONE)
{
(void) RUN_HOOK(transaction, after_commit, (thd, all));
thd->transaction.flags.run_hooks= false;
}
} }
else if (thd->transaction.flags.xid_written)
dec_prep_xids(thd);
thd->variables.gtid_next.set_undefined(); thd->variables.gtid_next.set_undefined();
/* /*
Remove committed GTID from owned_gtids, it was already logged on Remove committed GTID from owned_gtids, it was already logged on
MYSQL_BIN_LOG::write_cache(). MYSQL_BIN_LOG::write_cache().
*/ */
global_sid_lock->rdlock(); global_sid_lock->rdlock();
gtid_state->update_on_commit(thd); gtid_state->update_on_commit(thd);
global_sid_lock->unlock(); global_sid_lock->unlock();
DBUG_ASSERT(thd->commit_error || !thd->transaction.flags.commit_low); DBUG_ASSERT(thd->commit_error || !thd->transaction.flags.run_hooks);
DBUG_ASSERT(!thd_get_cache_mngr(thd)->dbug_any_finalized()); DBUG_ASSERT(!thd_get_cache_mngr(thd)->dbug_any_finalized());
DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d",
thd->thread_id, thd->commit_error)); thd->thread_id, thd->commit_error));
return thd->commit_error; return thd->commit_error;
} }
/** /**
Flush and commit the transaction. Flush and commit the transaction.
This will execute an ordered flush and commit of all outstanding This will execute an ordered flush and commit of all outstanding
skipping to change at line 6378 skipping to change at line 6591
Notes: Notes:
- It would be good if we could keep transaction coordinator - It would be good if we could keep transaction coordinator
log-specific data out of the THD structure, but that is not the log-specific data out of the THD structure, but that is not the
case right now. case right now.
- Everything in the transaction structure is reset when calling - Everything in the transaction structure is reset when calling
ha_commit_low since that calls st_transaction::cleanup. ha_commit_low since that calls st_transaction::cleanup.
*/ */
thd->transaction.flags.pending= true; thd->transaction.flags.pending= true;
thd->commit_error= 0; thd->commit_error= THD::CE_NONE;
thd->next_to_commit= NULL; thd->next_to_commit= NULL;
thd->durability_property= HA_IGNORE_DURABILITY; thd->durability_property= HA_IGNORE_DURABILITY;
thd->transaction.flags.real_commit= all; thd->transaction.flags.real_commit= all;
thd->transaction.flags.xid_written= false; thd->transaction.flags.xid_written= false;
thd->transaction.flags.commit_low= !skip_commit; thd->transaction.flags.commit_low= !skip_commit;
thd->transaction.flags.run_hooks= !skip_commit;
#ifndef DBUG_OFF #ifndef DBUG_OFF
/* /*
The group commit Leader may have to wait for follower whose transactio n The group commit Leader may have to wait for follower whose transactio n
is not ready to be preempted. Initially the status is pessimistic. is not ready to be preempted. Initially the status is pessimistic.
Preemption guarding logics is necessary only when DBUG_ON is set. Preemption guarding logics is necessary only when DBUG_ON is set.
It won't be required for the dbug-off case as long as the follower won 't It won't be required for the dbug-off case as long as the follower won 't
execute any thread-specific write access code in this method, which is execute any thread-specific write access code in this method, which is
the case as of current. the case as of current.
*/ */
thd->transaction.flags.ready_preempt= 0; thd->transaction.flags.ready_preempt= 0;
skipping to change at line 6478 skipping to change at line 6692
if (opt_binlog_order_commits) if (opt_binlog_order_commits)
{ {
if (change_stage(thd, Stage_manager::COMMIT_STAGE, if (change_stage(thd, Stage_manager::COMMIT_STAGE,
final_queue, &LOCK_sync, &LOCK_commit)) final_queue, &LOCK_sync, &LOCK_commit))
{ {
DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d",
thd->thread_id, thd->commit_error)); thd->thread_id, thd->commit_error));
DBUG_RETURN(finish_commit(thd)); DBUG_RETURN(finish_commit(thd));
} }
THD *commit_queue= stage_manager.fetch_queue_for(Stage_manager::COMMIT_ STAGE); THD *commit_queue= stage_manager.fetch_queue_for(Stage_manager::COMMIT_ STAGE);
process_commit_stage_queue(thd, commit_queue, flush_error); DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",
DEBUG_SYNC(thd, "before_process_commit_stage_queue"););
process_commit_stage_queue(thd, commit_queue);
mysql_mutex_unlock(&LOCK_commit); mysql_mutex_unlock(&LOCK_commit);
/*
Process after_commit after LOCK_commit is released for avoiding
3-way deadlock among user thread, rotate thread and dump thread.
*/
process_after_commit_stage_queue(thd, commit_queue);
final_queue= commit_queue; final_queue= commit_queue;
} }
else else
mysql_mutex_unlock(&LOCK_sync); mysql_mutex_unlock(&LOCK_sync);
/* Commit done so signal all waiting threads */ /* Commit done so signal all waiting threads */
stage_manager.signal_done(final_queue); stage_manager.signal_done(final_queue);
/* /*
Finish the commit before executing a rotate, or run the risk of a Finish the commit before executing a rotate, or run the risk of a
deadlock. We don't need the return value here since it is in deadlock. We don't need the return value here since it is in
thd->commit_error, which is returned below. thd->commit_error, which is returned below.
*/ */
(void) finish_commit(thd); (void) finish_commit(thd);
/* /*
If we need to rotate, we do it now. If we need to rotate, we do it without commit error.
Otherwise the thd->commit_error will be possibly reset.
*/ */
if (do_rotate) if (do_rotate && thd->commit_error == THD::CE_NONE)
{ {
/* /*
We can force the rotate since we did the check in Do not force the rotate as several consecutive groups may
flush_session_queue(). Giving "false" would have the same request unnecessary rotations.
result, but will do the check again.
NOTE: Run purge_logs wo/ holding LOCK_log because it does not NOTE: Run purge_logs wo/ holding LOCK_log because it does not
need the mutex. Otherwise causes various deadlocks. need the mutex. Otherwise causes various deadlocks.
NOTE: The LOCK_commit is necessary when doing a rotate, but that
is grabbed inside new_file_impl().
*/ */
DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE();); DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE(););
DEBUG_SYNC(thd, "ready_to_do_rotation");
bool check_purge= false; bool check_purge= false;
mysql_mutex_lock(&LOCK_log); mysql_mutex_lock(&LOCK_log);
int error= rotate(true, &check_purge); int error= rotate(false, &check_purge);
mysql_mutex_unlock(&LOCK_log); mysql_mutex_unlock(&LOCK_log);
if (!error && check_purge) if (!error && check_purge)
purge(); purge();
else else
thd->commit_error= error; thd->commit_error= THD::CE_COMMIT_ERROR;
} }
DBUG_RETURN(thd->commit_error); DBUG_RETURN(thd->commit_error);
} }
/** /**
MYSQLD server recovers from last crashed binlog. MYSQLD server recovers from last crashed binlog.
@param log IO_CACHE of the crashed binlog. @param log IO_CACHE of the crashed binlog.
@param fdle Format_description_log_event of the crashed binlog. @param fdle Format_description_log_event of the crashed binlog.
@param valid_pos The position of the last valid transaction or @param valid_pos The position of the last valid transaction or
skipping to change at line 7469 skipping to change at line 7688
add_to_binlog_accessed_dbs(""); add_to_binlog_accessed_dbs("");
break; break;
} }
if (!is_current_stmt_binlog_format_row()) if (!is_current_stmt_binlog_format_row())
add_to_binlog_accessed_dbs(table->db); add_to_binlog_accessed_dbs(table->db);
} }
} }
DBUG_PRINT("info", ("decision: logging in %s format", DBUG_PRINT("info", ("decision: logging in %s format",
is_current_stmt_binlog_format_row() ? is_current_stmt_binlog_format_row() ?
"ROW" : "STATEMENT")); "ROW" : "STATEMENT"));
if (variables.binlog_format == BINLOG_FORMAT_ROW &&
(lex->sql_command == SQLCOM_UPDATE ||
lex->sql_command == SQLCOM_UPDATE_MULTI ||
lex->sql_command == SQLCOM_DELETE ||
lex->sql_command == SQLCOM_DELETE_MULTI))
{
String table_names;
/*
Generate a warning for UPDATE/DELETE statements that modify a
BLACKHOLE table, as row events are not logged in row format.
*/
for (TABLE_LIST *table= tables; table; table= table->next_global)
{
if (table->placeholder())
continue;
if (table->table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB &&
table->lock_type >= TL_WRITE_ALLOW_WRITE)
{
table_names.append(table->table_name);
table_names.append(",");
}
}
if (!table_names.is_empty())
{
bool is_update= (lex->sql_command == SQLCOM_UPDATE ||
lex->sql_command == SQLCOM_UPDATE_MULTI);
/*
Replace the last ',' with '.' for table_names
*/
table_names.replace(table_names.length()-1, 1, ".", 1);
push_warning_printf(this, Sql_condition::WARN_LEVEL_WARN,
WARN_ON_BLOCKHOLE_IN_RBR,
ER(WARN_ON_BLOCKHOLE_IN_RBR),
is_update ? "UPDATE" : "DELETE",
table_names.c_ptr());
}
}
} }
#ifndef DBUG_OFF #ifndef DBUG_OFF
else else
DBUG_PRINT("info", ("decision: no logging since " DBUG_PRINT("info", ("decision: no logging since "
"mysql_bin_log.is_open() = %d " "mysql_bin_log.is_open() = %d "
"and (options & OPTION_BIN_LOG) = 0x%llx " "and (options & OPTION_BIN_LOG) = 0x%llx "
"and binlog_format = %lu " "and binlog_format = %lu "
"and binlog_filter->db_ok(db) = %d", "and binlog_filter->db_ok(db) = %d",
mysql_bin_log.is_open(), mysql_bin_log.is_open(),
(variables.option_bits & OPTION_BIN_LOG), (variables.option_bits & OPTION_BIN_LOG),
skipping to change at line 7943 skipping to change at line 8200
/** /**
Remove from read_set spurious columns. The write_set has been Remove from read_set spurious columns. The write_set has been
handled before in table->mark_columns_needed_for_update. handled before in table->mark_columns_needed_for_update.
*/ */
DBUG_PRINT_BITSET("debug", "table->read_set (before preparing): %s", tabl e->read_set); DBUG_PRINT_BITSET("debug", "table->read_set (before preparing): %s", tabl e->read_set);
THD *thd= table->in_use; THD *thd= table->in_use;
/** /**
if there is a primary key in the table (ie, user declared PK or a if there is a primary key in the table (ie, user declared PK or a
non-null unique index) and we dont want to ship the entire image. non-null unique index) and we dont want to ship the entire image,
and the handler involved supports this.
*/ */
if (table->s->primary_key < MAX_KEY && if (table->s->primary_key < MAX_KEY &&
(thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL)) (thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) &&
!ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW
_OPT))
{ {
/** /**
Just to be sure that tmp_set is currently not in use as Just to be sure that tmp_set is currently not in use as
the read_set already. the read_set already.
*/ */
DBUG_ASSERT(table->read_set != &table->tmp_set); DBUG_ASSERT(table->read_set != &table->tmp_set);
bitmap_clear_all(&table->tmp_set); bitmap_clear_all(&table->tmp_set);
switch(thd->variables.binlog_row_image) switch(thd->variables.binlog_row_image)
 End of changes. 74 change blocks. 
73 lines changed or deleted 341 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/