binlog.cc | binlog.cc | |||
---|---|---|---|---|
skipping to change at line 163 | skipping to change at line 163 | |||
if (unlikely(setup_thread_globals(m_original_thd))) | if (unlikely(setup_thread_globals(m_original_thd))) | |||
DBUG_ASSERT(0); // Out of memory?! | DBUG_ASSERT(0); // Out of memory?! | |||
#endif | #endif | |||
} | } | |||
/** | /** | |||
Attach the POSIX thread to a session. | Attach the POSIX thread to a session. | |||
*/ | */ | |||
int attach_to(THD *thd) | int attach_to(THD *thd) | |||
{ | { | |||
/* | ||||
Simulate session attach error. | ||||
*/ | ||||
DBUG_EXECUTE_IF("simulate_session_attach_error", | ||||
{ | ||||
if (rand() % 3 == 0) | ||||
return 1; | ||||
};); | ||||
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE | #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE | |||
if (PSI_server) | if (PSI_server) | |||
PSI_server->set_thread(thd_get_psi(thd)); | PSI_server->set_thread(thd_get_psi(thd)); | |||
#endif | #endif | |||
#ifndef EMBEDDED_LIBRARY | #ifndef EMBEDDED_LIBRARY | |||
if (unlikely(setup_thread_globals(thd))) | if (unlikely(setup_thread_globals(thd))) | |||
{ | { | |||
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE | #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE | |||
if (PSI_server) | if (PSI_server) | |||
PSI_server->set_thread(m_saved_psi); | PSI_server->set_thread(m_saved_psi); | |||
skipping to change at line 1703 | skipping to change at line 1711 | |||
if (linfo->index_file_offset < purge_offset) | if (linfo->index_file_offset < purge_offset) | |||
linfo->fatal = (linfo->index_file_offset != 0); | linfo->fatal = (linfo->index_file_offset != 0); | |||
else | else | |||
linfo->index_file_offset -= purge_offset; | linfo->index_file_offset -= purge_offset; | |||
mysql_mutex_unlock(&linfo->lock); | mysql_mutex_unlock(&linfo->lock); | |||
} | } | |||
} | } | |||
mysql_mutex_unlock(&LOCK_thread_count); | mysql_mutex_unlock(&LOCK_thread_count); | |||
} | } | |||
static bool log_in_use(const char* log_name) | static int log_in_use(const char* log_name) | |||
{ | { | |||
size_t log_name_len = strlen(log_name) + 1; | size_t log_name_len = strlen(log_name) + 1; | |||
bool result = 0; | int thread_count=0; | |||
mysql_mutex_lock(&LOCK_thread_count); | mysql_mutex_lock(&LOCK_thread_count); | |||
Thread_iterator it= global_thread_list_begin(); | Thread_iterator it= global_thread_list_begin(); | |||
Thread_iterator end= global_thread_list_end(); | Thread_iterator end= global_thread_list_end(); | |||
for (; it != end; ++it) | for (; it != end; ++it) | |||
{ | { | |||
LOG_INFO* linfo; | LOG_INFO* linfo; | |||
if ((linfo = (*it)->current_linfo)) | if ((linfo = (*it)->current_linfo)) | |||
{ | { | |||
mysql_mutex_lock(&linfo->lock); | mysql_mutex_lock(&linfo->lock); | |||
result = !memcmp(log_name, linfo->log_file_name, log_name_len); | if(!memcmp(log_name, linfo->log_file_name, log_name_len)) | |||
{ | ||||
thread_count++; | ||||
sql_print_warning("file %s was not purged because it was being read | ||||
" | ||||
"by thread number %llu", log_name, | ||||
(ulonglong)(*it)->thread_id); | ||||
} | ||||
mysql_mutex_unlock(&linfo->lock); | mysql_mutex_unlock(&linfo->lock); | |||
if (result) | ||||
break; | ||||
} | } | |||
} | } | |||
mysql_mutex_unlock(&LOCK_thread_count); | mysql_mutex_unlock(&LOCK_thread_count); | |||
return result; | return thread_count; | |||
} | } | |||
static bool purge_error_message(THD* thd, int res) | static bool purge_error_message(THD* thd, int res) | |||
{ | { | |||
uint errcode; | uint errcode; | |||
if ((errcode= purge_log_get_error_code(res)) != 0) | if ((errcode= purge_log_get_error_code(res)) != 0) | |||
{ | { | |||
my_message(errcode, ER(errcode), MYF(0)); | my_message(errcode, ER(errcode), MYF(0)); | |||
return TRUE; | return TRUE; | |||
skipping to change at line 1920 | skipping to change at line 1932 | |||
{ | { | |||
my_ok(thd); | my_ok(thd); | |||
return FALSE; | return FALSE; | |||
} | } | |||
mysql_bin_log.make_log_name(search_file_name, to_log); | mysql_bin_log.make_log_name(search_file_name, to_log); | |||
return purge_error_message(thd, | return purge_error_message(thd, | |||
mysql_bin_log.purge_logs(search_file_name, fal se, | mysql_bin_log.purge_logs(search_file_name, fal se, | |||
true/*need_lock_index =true*/, | true/*need_lock_index =true*/, | |||
true/*need_update_thr eads=true*/, | true/*need_update_thr eads=true*/, | |||
NULL)); | NULL, false)); | |||
} | } | |||
/** | /** | |||
Execute a PURGE BINARY LOGS BEFORE <date> command. | Execute a PURGE BINARY LOGS BEFORE <date> command. | |||
@param thd Pointer to THD object for the client thread executing the | @param thd Pointer to THD object for the client thread executing the | |||
statement. | statement. | |||
@param purge_time Date before which logs should be purged. | @param purge_time Date before which logs should be purged. | |||
skipping to change at line 1942 | skipping to change at line 1954 | |||
@retval TRUE failure | @retval TRUE failure | |||
*/ | */ | |||
bool purge_master_logs_before_date(THD* thd, time_t purge_time) | bool purge_master_logs_before_date(THD* thd, time_t purge_time) | |||
{ | { | |||
if (!mysql_bin_log.is_open()) | if (!mysql_bin_log.is_open()) | |||
{ | { | |||
my_ok(thd); | my_ok(thd); | |||
return 0; | return 0; | |||
} | } | |||
return purge_error_message(thd, | return purge_error_message(thd, | |||
mysql_bin_log.purge_logs_before_date(purge_tim | mysql_bin_log.purge_logs_before_date(purge_tim | |||
e)); | e, | |||
false)); | ||||
} | } | |||
#endif /* EMBEDDED_LIBRARY */ | #endif /* EMBEDDED_LIBRARY */ | |||
/* | /* | |||
Helper function to get the error code of the query to be binlogged. | Helper function to get the error code of the query to be binlogged. | |||
*/ | */ | |||
int query_error_code(THD *thd, bool not_killed) | int query_error_code(THD *thd, bool not_killed) | |||
{ | { | |||
int error; | int error; | |||
skipping to change at line 2132 | skipping to change at line 2145 | |||
mysql_mutex_unlock(&LOCK_thread_count); | mysql_mutex_unlock(&LOCK_thread_count); | |||
if ((file=open_binlog_file(&log, linfo.log_file_name, &errmsg)) < 0) | if ((file=open_binlog_file(&log, linfo.log_file_name, &errmsg)) < 0) | |||
goto err; | goto err; | |||
/* | /* | |||
to account binlog event header size | to account binlog event header size | |||
*/ | */ | |||
thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER; | thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER; | |||
DEBUG_SYNC(thd, "after_show_binlog_event_found_file"); | ||||
mysql_mutex_lock(log_lock); | mysql_mutex_lock(log_lock); | |||
/* | /* | |||
open_binlog_file() sought to position 4. | open_binlog_file() sought to position 4. | |||
Read the first event in case it's a Format_description_log_event, to | Read the first event in case it's a Format_description_log_event, to | |||
know the format. If there's no such event, we are 3.23 or 4.x. This | know the format. If there's no such event, we are 3.23 or 4.x. This | |||
code, like before, can't read 3.23 binlogs. | code, like before, can't read 3.23 binlogs. | |||
This code will fail on a mixed relay log (one which has Format_desc t hen | This code will fail on a mixed relay log (one which has Format_desc t hen | |||
Rotate then Format_desc). | Rotate then Format_desc). | |||
*/ | */ | |||
skipping to change at line 2290 | skipping to change at line 2305 | |||
{ | { | |||
DBUG_ENTER("cleanup"); | DBUG_ENTER("cleanup"); | |||
if (inited) | if (inited) | |||
{ | { | |||
inited= 0; | inited= 0; | |||
close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT); | close(LOG_CLOSE_INDEX|LOG_CLOSE_STOP_EVENT); | |||
mysql_mutex_destroy(&LOCK_log); | mysql_mutex_destroy(&LOCK_log); | |||
mysql_mutex_destroy(&LOCK_index); | mysql_mutex_destroy(&LOCK_index); | |||
mysql_mutex_destroy(&LOCK_commit); | mysql_mutex_destroy(&LOCK_commit); | |||
mysql_mutex_destroy(&LOCK_sync); | mysql_mutex_destroy(&LOCK_sync); | |||
mysql_mutex_destroy(&LOCK_xids); | ||||
mysql_cond_destroy(&update_cond); | mysql_cond_destroy(&update_cond); | |||
my_atomic_rwlock_destroy(&m_prep_xids_lock); | my_atomic_rwlock_destroy(&m_prep_xids_lock); | |||
mysql_cond_destroy(&m_prep_xids_cond); | mysql_cond_destroy(&m_prep_xids_cond); | |||
stage_manager.deinit(); | stage_manager.deinit(); | |||
} | } | |||
DBUG_VOID_RETURN; | DBUG_VOID_RETURN; | |||
} | } | |||
void MYSQL_BIN_LOG::init_pthread_objects() | void MYSQL_BIN_LOG::init_pthread_objects() | |||
{ | { | |||
MYSQL_LOG::init_pthread_objects(); | MYSQL_LOG::init_pthread_objects(); | |||
mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW); | mysql_mutex_init(m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW); | |||
mysql_mutex_init(m_key_LOCK_commit, &LOCK_commit, MY_MUTEX_INIT_FAST); | mysql_mutex_init(m_key_LOCK_commit, &LOCK_commit, MY_MUTEX_INIT_FAST); | |||
mysql_mutex_init(m_key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST); | mysql_mutex_init(m_key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST); | |||
mysql_mutex_init(m_key_LOCK_xids, &LOCK_xids, MY_MUTEX_INIT_FAST); | ||||
mysql_cond_init(m_key_update_cond, &update_cond, 0); | mysql_cond_init(m_key_update_cond, &update_cond, 0); | |||
my_atomic_rwlock_init(&m_prep_xids_lock); | my_atomic_rwlock_init(&m_prep_xids_lock); | |||
mysql_cond_init(m_key_prep_xids_cond, &m_prep_xids_cond, NULL); | mysql_cond_init(m_key_prep_xids_cond, &m_prep_xids_cond, NULL); | |||
stage_manager.init( | stage_manager.init( | |||
#ifdef HAVE_PSI_INTERFACE | #ifdef HAVE_PSI_INTERFACE | |||
m_key_LOCK_flush_queue, | m_key_LOCK_flush_queue, | |||
m_key_LOCK_sync_queue, | m_key_LOCK_sync_queue, | |||
m_key_LOCK_commit_queue, | m_key_LOCK_commit_queue, | |||
m_key_LOCK_done, m_key_COND_done | m_key_LOCK_done, m_key_COND_done | |||
#endif | #endif | |||
skipping to change at line 2506 | skipping to change at line 2523 | |||
#ifndef DBUG_OFF | #ifndef DBUG_OFF | |||
char* prev_buffer= prev_gtids_ev->get_str(NULL, NULL); | char* prev_buffer= prev_gtids_ev->get_str(NULL, NULL); | |||
DBUG_PRINT("info", ("Got Previous_gtids from file '%s': Gtid_set='%s' .", | DBUG_PRINT("info", ("Got Previous_gtids from file '%s': Gtid_set='%s' .", | |||
filename, prev_buffer)); | filename, prev_buffer)); | |||
my_free(prev_buffer); | my_free(prev_buffer); | |||
#endif | #endif | |||
break; | break; | |||
} | } | |||
case GTID_LOG_EVENT: | case GTID_LOG_EVENT: | |||
{ | { | |||
DBUG_EXECUTE_IF("inject_fault_bug16502579", { | ||||
DBUG_PRINT("debug", ("GTID_LOG_EVENT found. Injected | ||||
ret=NO_GTIDS.")); | ||||
ret=NO_GTIDS; | ||||
}); | ||||
if (ret != GOT_GTIDS) | if (ret != GOT_GTIDS) | |||
{ | { | |||
if (ret != GOT_PREVIOUS_GTIDS) | if (ret != GOT_PREVIOUS_GTIDS) | |||
// should not happen | { | |||
my_error(ER_MASTER_FATAL_ERROR_READING_BINLOG, MYF(0)); | /* | |||
Since this routine is run on startup, there may not be a | ||||
THD instance. Therefore, ER(X) cannot be used. | ||||
*/ | ||||
const char* msg_fmt= (current_thd != NULL) ? | ||||
ER(ER_BINLOG_LOGICAL_CORRUPTION) : | ||||
ER_DEFAULT(ER_BINLOG_LOGICAL_CORRUPTION); | ||||
my_printf_error(ER_BINLOG_LOGICAL_CORRUPTION, | ||||
msg_fmt, MYF(0), | ||||
filename, | ||||
"The first global transaction identifier was read | ||||
, but " | ||||
"no other information regarding identifiers exist | ||||
ing " | ||||
"on the previous log files was found."); | ||||
ret= ERROR, done= true; | ||||
break; | ||||
} | ||||
else | else | |||
ret= GOT_GTIDS; | ret= GOT_GTIDS; | |||
} | } | |||
/* | /* | |||
When all_gtids==NULL, we just check if the binary log contains | When all_gtids==NULL, we just check if the binary log contains | |||
at least one Gtid_log_event, so that we can distinguish the | at least one Gtid_log_event, so that we can distinguish the | |||
return values GOT_GTID and GOT_PREVIOUS_GTIDS. We don't need | return values GOT_GTID and GOT_PREVIOUS_GTIDS. We don't need | |||
to read anything else from the binary log. | to read anything else from the binary log. | |||
*/ | */ | |||
if (all_gtids == NULL) | if (all_gtids == NULL) | |||
skipping to change at line 3329 | skipping to change at line 3365 | |||
if (!log_name || | if (!log_name || | |||
(log_name_len == fname_len-1 && full_fname[log_name_len] == '\n' && | (log_name_len == fname_len-1 && full_fname[log_name_len] == '\n' && | |||
!memcmp(full_fname, full_log_name, log_name_len))) | !memcmp(full_fname, full_log_name, log_name_len))) | |||
{ | { | |||
DBUG_PRINT("info", ("Found log file entry")); | DBUG_PRINT("info", ("Found log file entry")); | |||
full_fname[fname_len-1]= 0; // remove last \n | full_fname[fname_len-1]= 0; // remove last \n | |||
linfo->index_file_start_offset= offset; | linfo->index_file_start_offset= offset; | |||
linfo->index_file_offset = my_b_tell(&index_file); | linfo->index_file_offset = my_b_tell(&index_file); | |||
break; | break; | |||
} | } | |||
linfo->entry_index++; | ||||
} | } | |||
end: | end: | |||
if (need_lock_index) | if (need_lock_index) | |||
mysql_mutex_unlock(&LOCK_index); | mysql_mutex_unlock(&LOCK_index); | |||
DBUG_RETURN(error); | DBUG_RETURN(error); | |||
} | } | |||
/** | /** | |||
Find the position in the log-index-file for the given log name. | Find the position in the log-index-file for the given log name. | |||
skipping to change at line 3422 | skipping to change at line 3459 | |||
1 error | 1 error | |||
*/ | */ | |||
bool MYSQL_BIN_LOG::reset_logs(THD* thd) | bool MYSQL_BIN_LOG::reset_logs(THD* thd) | |||
{ | { | |||
LOG_INFO linfo; | LOG_INFO linfo; | |||
bool error=0; | bool error=0; | |||
int err; | int err; | |||
const char* save_name; | const char* save_name; | |||
DBUG_ENTER("reset_logs"); | DBUG_ENTER("reset_logs"); | |||
/* | ||||
Flush logs for storage engines, so that the last transaction | ||||
is fsynced inside storage engines. | ||||
*/ | ||||
if (ha_flush_logs(NULL)) | ||||
DBUG_RETURN(1); | ||||
ha_reset_logs(thd); | ha_reset_logs(thd); | |||
/* | /* | |||
The following mutex is needed to ensure that no threads call | The following mutex is needed to ensure that no threads call | |||
'delete thd' as we would then risk missing a 'rollback' from this | 'delete thd' as we would then risk missing a 'rollback' from this | |||
thread. If the transaction involved MyISAM tables, it should go | thread. If the transaction involved MyISAM tables, it should go | |||
into binlog even on rollback. | into binlog even on rollback. | |||
*/ | */ | |||
mysql_mutex_lock(&LOCK_thread_count); | mysql_mutex_lock(&LOCK_thread_count); | |||
skipping to change at line 3746 | skipping to change at line 3790 | |||
/* Store where we are in the new file for the execution thread */ | /* Store where we are in the new file for the execution thread */ | |||
rli->flush_info(TRUE); | rli->flush_info(TRUE); | |||
DBUG_EXECUTE_IF("crash_before_purge_logs", DBUG_SUICIDE();); | DBUG_EXECUTE_IF("crash_before_purge_logs", DBUG_SUICIDE();); | |||
mysql_mutex_lock(&rli->log_space_lock); | mysql_mutex_lock(&rli->log_space_lock); | |||
rli->relay_log.purge_logs(to_purge_if_included, included, | rli->relay_log.purge_logs(to_purge_if_included, included, | |||
false/*need_lock_index=false*/, | false/*need_lock_index=false*/, | |||
false/*need_update_threads=false*/, | false/*need_update_threads=false*/, | |||
&rli->log_space_total); | &rli->log_space_total, true); | |||
// Tell the I/O thread to take the relay_log_space_limit into account | // Tell the I/O thread to take the relay_log_space_limit into account | |||
rli->ignore_log_space_limit= 0; | rli->ignore_log_space_limit= 0; | |||
mysql_mutex_unlock(&rli->log_space_lock); | mysql_mutex_unlock(&rli->log_space_lock); | |||
/* | /* | |||
Ok to broadcast after the critical region as there is no risk of | Ok to broadcast after the critical region as there is no risk of | |||
the mutex being destroyed by this thread later - this helps save | the mutex being destroyed by this thread later - this helps save | |||
context switches | context switches | |||
*/ | */ | |||
mysql_cond_broadcast(&rli->log_space_cond); | mysql_cond_broadcast(&rli->log_space_cond); | |||
skipping to change at line 3855 | skipping to change at line 3899 | |||
/** | /** | |||
Remove all logs before the given log from disk and from the index file. | Remove all logs before the given log from disk and from the index file. | |||
@param to_log Delete all log file name before this file. | @param to_log Delete all log file name before this file. | |||
@param included If true, to_log is deleted too. | @param included If true, to_log is deleted too. | |||
@param need_lock_index | @param need_lock_index | |||
@param need_update_threads If we want to update the log coordinates of | @param need_update_threads If we want to update the log coordinates of | |||
all threads. False for relay logs, true otherw ise. | all threads. False for relay logs, true otherw ise. | |||
@param freed_log_space If not null, decrement this variable of | @param freed_log_space If not null, decrement this variable of | |||
the amount of log space freed | the amount of log space freed | |||
@param auto_purge True if this is an automatic purge. | ||||
@note | @note | |||
If any of the logs before the deleted one is in use, | If any of the logs before the deleted one is in use, | |||
only purge logs up to this one. | only purge logs up to this one. | |||
@retval | @retval | |||
0 ok | 0 ok | |||
@retval | @retval | |||
LOG_INFO_EOF to_log not found | LOG_INFO_EOF to_log not found | |||
LOG_INFO_EMFILE too many files opened | LOG_INFO_EMFILE too many files opened | |||
LOG_INFO_FATAL if any other than ENOENT error from | LOG_INFO_FATAL if any other than ENOENT error from | |||
mysql_file_stat() or mysql_file_delete() | mysql_file_stat() or mysql_file_delete() | |||
*/ | */ | |||
int MYSQL_BIN_LOG::purge_logs(const char *to_log, | int MYSQL_BIN_LOG::purge_logs(const char *to_log, | |||
bool included, | bool included, | |||
bool need_lock_index, | bool need_lock_index, | |||
bool need_update_threads, | bool need_update_threads, | |||
ulonglong *decrease_log_space) | ulonglong *decrease_log_space, | |||
bool auto_purge) | ||||
{ | { | |||
int error= 0; | int error= 0, no_of_log_files_to_purge= 0, no_of_log_files_purged= 0; | |||
int no_of_threads_locking_log= 0; | ||||
bool exit_loop= 0; | bool exit_loop= 0; | |||
LOG_INFO log_info; | LOG_INFO log_info; | |||
THD *thd= current_thd; | THD *thd= current_thd; | |||
DBUG_ENTER("purge_logs"); | DBUG_ENTER("purge_logs"); | |||
DBUG_PRINT("info",("to_log= %s",to_log)); | DBUG_PRINT("info",("to_log= %s",to_log)); | |||
if (need_lock_index) | if (need_lock_index) | |||
mysql_mutex_lock(&LOCK_index); | mysql_mutex_lock(&LOCK_index); | |||
else | else | |||
mysql_mutex_assert_owner(&LOCK_index); | mysql_mutex_assert_owner(&LOCK_index); | |||
if ((error=find_log_pos(&log_info, to_log, false/*need_lock_index=false*/ ))) | if ((error=find_log_pos(&log_info, to_log, false/*need_lock_index=false*/ ))) | |||
{ | { | |||
sql_print_error("MYSQL_BIN_LOG::purge_logs was called with file %s not " | sql_print_error("MYSQL_BIN_LOG::purge_logs was called with file %s not " | |||
"listed in the index.", to_log); | "listed in the index.", to_log); | |||
goto err; | goto err; | |||
} | } | |||
no_of_log_files_to_purge= log_info.entry_index; | ||||
if ((error= open_purge_index_file(TRUE))) | if ((error= open_purge_index_file(TRUE))) | |||
{ | { | |||
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to sync the index fil e."); | sql_print_error("MYSQL_BIN_LOG::purge_logs failed to sync the index fil e."); | |||
goto err; | goto err; | |||
} | } | |||
/* | /* | |||
File name exists in index file; delete until we find this file | File name exists in index file; delete until we find this file | |||
or a file that is used. | or a file that is used. | |||
*/ | */ | |||
if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/) )) | if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/) )) | |||
goto err; | goto err; | |||
while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)) && | ||||
!is_active(log_info.log_file_name) && | while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included))) | |||
!log_in_use(log_info.log_file_name)) | ||||
{ | { | |||
if(is_active(log_info.log_file_name)) | ||||
{ | ||||
if(!auto_purge) | ||||
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, | ||||
ER_WARN_PURGE_LOG_IS_ACTIVE, | ||||
ER(ER_WARN_PURGE_LOG_IS_ACTIVE), | ||||
log_info.log_file_name); | ||||
break; | ||||
} | ||||
if ((no_of_threads_locking_log= log_in_use(log_info.log_file_name))) | ||||
{ | ||||
if(!auto_purge) | ||||
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, | ||||
ER_WARN_PURGE_LOG_IN_USE, | ||||
ER(ER_WARN_PURGE_LOG_IN_USE), | ||||
log_info.log_file_name, no_of_threads_locking_ | ||||
log, | ||||
no_of_log_files_purged, no_of_log_files_to_purg | ||||
e); | ||||
break; | ||||
} | ||||
no_of_log_files_purged++; | ||||
if ((error= register_purge_index_entry(log_info.log_file_name))) | if ((error= register_purge_index_entry(log_info.log_file_name))) | |||
{ | { | |||
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to copy %s to regis ter file.", | sql_print_error("MYSQL_BIN_LOG::purge_logs failed to copy %s to regis ter file.", | |||
log_info.log_file_name); | log_info.log_file_name); | |||
goto err; | goto err; | |||
} | } | |||
if (find_next_log(&log_info, false/*need_lock_index=false*/) || exit_lo op) | if (find_next_log(&log_info, false/*need_lock_index=false*/) || exit_lo op) | |||
break; | break; | |||
} | } | |||
skipping to change at line 3939 | skipping to change at line 4009 | |||
if ((error=remove_logs_from_index(&log_info, need_update_threads))) | if ((error=remove_logs_from_index(&log_info, need_update_threads))) | |||
{ | { | |||
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to update the index f ile"); | sql_print_error("MYSQL_BIN_LOG::purge_logs failed to update the index f ile"); | |||
goto err; | goto err; | |||
} | } | |||
// Update gtid_state->lost_gtids | // Update gtid_state->lost_gtids | |||
if (gtid_mode > 0 && !is_relay_log) | if (gtid_mode > 0 && !is_relay_log) | |||
{ | { | |||
global_sid_lock->wrlock(); | global_sid_lock->wrlock(); | |||
if (init_gtid_sets(NULL, | error= init_gtid_sets(NULL, | |||
const_cast<Gtid_set *>(gtid_state->get_lost_gtids()) , | const_cast<Gtid_set *>(gtid_state->get_lost_gtids()) , | |||
opt_master_verify_checksum, | opt_master_verify_checksum, | |||
false/*false=don't need lock*/)) | false/*false=don't need lock*/); | |||
goto err; | ||||
global_sid_lock->unlock(); | global_sid_lock->unlock(); | |||
if (error) | ||||
goto err; | ||||
} | } | |||
DBUG_EXECUTE_IF("crash_purge_critical_after_update_index", DBUG_SUICIDE() ;); | DBUG_EXECUTE_IF("crash_purge_critical_after_update_index", DBUG_SUICIDE() ;); | |||
err: | err: | |||
int error_index= 0, close_error_index= 0; | ||||
/* Read each entry from purge_index_file and delete the file. */ | /* Read each entry from purge_index_file and delete the file. */ | |||
if (is_inited_purge_index_file() && | if (is_inited_purge_index_file() && | |||
(error= purge_index_entry(thd, decrease_log_space, false/*need_lock_i ndex=false*/))) | (error_index= purge_index_entry(thd, decrease_log_space, false/*need_ lock_index=false*/))) | |||
sql_print_error("MYSQL_BIN_LOG::purge_logs failed to process registered files" | sql_print_error("MYSQL_BIN_LOG::purge_logs failed to process registered files" | |||
" that would be purged."); | " that would be purged."); | |||
close_purge_index_file(); | ||||
close_error_index= close_purge_index_file(); | ||||
DBUG_EXECUTE_IF("crash_purge_non_critical_after_update_index", DBUG_SUICI DE();); | DBUG_EXECUTE_IF("crash_purge_non_critical_after_update_index", DBUG_SUICI DE();); | |||
if (need_lock_index) | if (need_lock_index) | |||
mysql_mutex_unlock(&LOCK_index); | mysql_mutex_unlock(&LOCK_index); | |||
/* | ||||
Error codes from purge logs take precedence. | ||||
Then error codes from purging the index entry. | ||||
Finally, error codes from closing the purge index file. | ||||
*/ | ||||
error= error ? error : (error_index ? error_index : | ||||
close_error_index); | ||||
DBUG_RETURN(error); | DBUG_RETURN(error); | |||
} | } | |||
int MYSQL_BIN_LOG::set_purge_index_file_name(const char *base_file_name) | int MYSQL_BIN_LOG::set_purge_index_file_name(const char *base_file_name) | |||
{ | { | |||
int error= 0; | int error= 0; | |||
DBUG_ENTER("MYSQL_BIN_LOG::set_purge_index_file_name"); | DBUG_ENTER("MYSQL_BIN_LOG::set_purge_index_file_name"); | |||
if (fn_format(purge_index_file_name, base_file_name, mysql_data_home, | if (fn_format(purge_index_file_name, base_file_name, mysql_data_home, | |||
".~rec~", MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH | | ".~rec~", MYF(MY_UNPACK_FILENAME | MY_SAFE_PATH | | |||
MY_REPLACE_EXT)) == NULL) | MY_REPLACE_EXT)) == NULL) | |||
skipping to change at line 4245 | skipping to change at line 4328 | |||
err: | err: | |||
DBUG_RETURN(error); | DBUG_RETURN(error); | |||
} | } | |||
/** | /** | |||
Remove all logs before the given file date from disk and from the | Remove all logs before the given file date from disk and from the | |||
index file. | index file. | |||
@param thd Thread pointer | @param thd Thread pointer | |||
@param purge_time Delete all log files before given date. | @param purge_time Delete all log files before given date. | |||
@param auto_purge True if this is an automatic purge. | ||||
@note | @note | |||
If any of the logs before the deleted one is in use, | If any of the logs before the deleted one is in use, | |||
only purge logs up to this one. | only purge logs up to this one. | |||
@retval | @retval | |||
0 ok | 0 ok | |||
@retval | @retval | |||
LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated | LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated | |||
LOG_INFO_FATAL if any other than ENOENT error from | LOG_INFO_FATAL if any other than ENOENT error from | |||
mysql_file_stat() or mysql_file_delete() | mysql_file_stat() or mysql_file_delete() | |||
*/ | */ | |||
int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time) | int MYSQL_BIN_LOG::purge_logs_before_date(time_t purge_time, bool auto_purg e) | |||
{ | { | |||
int error; | int error; | |||
char to_log[FN_REFLEN]; | int no_of_threads_locking_log= 0, no_of_log_files_purged= 0; | |||
bool log_is_active= false, log_is_in_use= false; | ||||
char to_log[FN_REFLEN], copy_log_in_use[FN_REFLEN]; | ||||
LOG_INFO log_info; | LOG_INFO log_info; | |||
MY_STAT stat_area; | MY_STAT stat_area; | |||
THD *thd= current_thd; | THD *thd= current_thd; | |||
DBUG_ENTER("purge_logs_before_date"); | DBUG_ENTER("purge_logs_before_date"); | |||
mysql_mutex_lock(&LOCK_index); | mysql_mutex_lock(&LOCK_index); | |||
to_log[0]= 0; | to_log[0]= 0; | |||
if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/) )) | if ((error=find_log_pos(&log_info, NullS, false/*need_lock_index=false*/) )) | |||
goto err; | goto err; | |||
while (strcmp(log_file_name, log_info.log_file_name) && | while (!(log_is_active= is_active(log_info.log_file_name))) | |||
!is_active(log_info.log_file_name) && | ||||
!log_in_use(log_info.log_file_name)) | ||||
{ | { | |||
if ((no_of_threads_locking_log= log_in_use(log_info.log_file_name))) | ||||
{ | ||||
if (!auto_purge) | ||||
{ | ||||
log_is_in_use= true; | ||||
strcpy(copy_log_in_use, log_info.log_file_name); | ||||
} | ||||
break; | ||||
} | ||||
no_of_log_files_purged++; | ||||
if (!mysql_file_stat(m_key_file_log, | if (!mysql_file_stat(m_key_file_log, | |||
log_info.log_file_name, &stat_area, MYF(0))) | log_info.log_file_name, &stat_area, MYF(0))) | |||
{ | { | |||
if (my_errno == ENOENT) | if (my_errno == ENOENT) | |||
{ | { | |||
/* | /* | |||
It's not fatal if we can't stat a log file that does not exist. | It's not fatal if we can't stat a log file that does not exist. | |||
*/ | */ | |||
my_errno= 0; | my_errno= 0; | |||
} | } | |||
skipping to change at line 4325 | skipping to change at line 4420 | |||
strmake(to_log, | strmake(to_log, | |||
log_info.log_file_name, | log_info.log_file_name, | |||
sizeof(log_info.log_file_name) - 1); | sizeof(log_info.log_file_name) - 1); | |||
else | else | |||
break; | break; | |||
} | } | |||
if (find_next_log(&log_info, false/*need_lock_index=false*/)) | if (find_next_log(&log_info, false/*need_lock_index=false*/)) | |||
break; | break; | |||
} | } | |||
if (log_is_active) | ||||
{ | ||||
if(!auto_purge) | ||||
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, | ||||
ER_WARN_PURGE_LOG_IS_ACTIVE, | ||||
ER(ER_WARN_PURGE_LOG_IS_ACTIVE), | ||||
log_info.log_file_name); | ||||
} | ||||
if (log_is_in_use) | ||||
{ | ||||
int no_of_log_files_to_purge= no_of_log_files_purged+1; | ||||
while (strcmp(log_file_name, log_info.log_file_name)) | ||||
{ | ||||
if (mysql_file_stat(m_key_file_log, log_info.log_file_name, | ||||
&stat_area, MYF(0))) | ||||
{ | ||||
if (stat_area.st_mtime < purge_time) | ||||
no_of_log_files_to_purge++; | ||||
else | ||||
break; | ||||
} | ||||
if (find_next_log(&log_info, false/*need_lock_index=false*/)) | ||||
{ | ||||
no_of_log_files_to_purge++; | ||||
break; | ||||
} | ||||
} | ||||
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, | ||||
ER_WARN_PURGE_LOG_IN_USE, | ||||
ER(ER_WARN_PURGE_LOG_IN_USE), | ||||
copy_log_in_use, no_of_threads_locking_log, | ||||
no_of_log_files_purged, no_of_log_files_to_purge); | ||||
} | ||||
error= (to_log[0] ? purge_logs(to_log, true, | error= (to_log[0] ? purge_logs(to_log, true, | |||
false/*need_lock_index=false*/, | false/*need_lock_index=false*/, | |||
true/*need_update_threads=true*/, | true/*need_update_threads=true*/, | |||
(ulonglong *) 0) : 0); | (ulonglong *) 0, auto_purge) : 0); | |||
err: | err: | |||
mysql_mutex_unlock(&LOCK_index); | mysql_mutex_unlock(&LOCK_index); | |||
DBUG_RETURN(error); | DBUG_RETURN(error); | |||
} | } | |||
#endif /* HAVE_REPLICATION */ | #endif /* HAVE_REPLICATION */ | |||
/** | /** | |||
Create a new log file name. | Create a new log file name. | |||
skipping to change at line 4416 | skipping to change at line 4548 | |||
if (!is_open()) | if (!is_open()) | |||
{ | { | |||
DBUG_PRINT("info",("log is closed")); | DBUG_PRINT("info",("log is closed")); | |||
DBUG_RETURN(error); | DBUG_RETURN(error); | |||
} | } | |||
if (need_lock_log) | if (need_lock_log) | |||
mysql_mutex_lock(&LOCK_log); | mysql_mutex_lock(&LOCK_log); | |||
else | else | |||
mysql_mutex_assert_owner(&LOCK_log); | mysql_mutex_assert_owner(&LOCK_log); | |||
mysql_mutex_lock(&LOCK_commit); | DBUG_EXECUTE_IF("semi_sync_3-way_deadlock", | |||
DEBUG_SYNC(current_thd, "before_rotate_binlog");); | ||||
mysql_mutex_lock(&LOCK_xids); | ||||
/* | /* | |||
We need to ensure that the number of prepared XIDs are 0. | We need to ensure that the number of prepared XIDs are 0. | |||
If m_prep_xids is not zero: | If m_prep_xids is not zero: | |||
- We release the LOCK_commit lock to allow sessions to commit, | - We wait for storage engine commit, hence decrease m_prep_xids | |||
hence decrease m_prep_xids | ||||
- We keep the LOCK_log to block new transactions from being | - We keep the LOCK_log to block new transactions from being | |||
written to the binary log. | written to the binary log. | |||
*/ | */ | |||
while (get_prep_xids() > 0) | while (get_prep_xids() > 0) | |||
mysql_cond_wait(&m_prep_xids_cond, &LOCK_commit); | mysql_cond_wait(&m_prep_xids_cond, &LOCK_xids); | |||
mysql_mutex_unlock(&LOCK_xids); | ||||
mysql_mutex_lock(&LOCK_index); | mysql_mutex_lock(&LOCK_index); | |||
if ((error= ha_flush_logs(0))) | if (DBUG_EVALUATE_IF("expire_logs_always", 0, 1) | |||
&& (error= ha_flush_logs(NULL))) | ||||
goto end; | goto end; | |||
mysql_mutex_assert_owner(&LOCK_log); | mysql_mutex_assert_owner(&LOCK_log); | |||
mysql_mutex_assert_owner(&LOCK_commit); | ||||
mysql_mutex_assert_owner(&LOCK_index); | mysql_mutex_assert_owner(&LOCK_index); | |||
/* Reuse old name if not binlog and not update log */ | /* Reuse old name if not binlog and not update log */ | |||
new_name_ptr= name; | new_name_ptr= name; | |||
/* | /* | |||
If user hasn't specified an extension, generate a new log name | If user hasn't specified an extension, generate a new log name | |||
We have to do this here and not in open as we want to store the | We have to do this here and not in open as we want to store the | |||
new file name in the current binary log file. | new file name in the current binary log file. | |||
*/ | */ | |||
skipping to change at line 4557 | skipping to change at line 4692 | |||
close(LOG_CLOSE_INDEX); | close(LOG_CLOSE_INDEX); | |||
sql_print_error("Could not open %s for logging (error %d). " | sql_print_error("Could not open %s for logging (error %d). " | |||
"Turning logging off for the whole duration " | "Turning logging off for the whole duration " | |||
"of the MySQL server process. To turn it on " | "of the MySQL server process. To turn it on " | |||
"again: fix the cause, shutdown the MySQL " | "again: fix the cause, shutdown the MySQL " | |||
"server and restart it.", | "server and restart it.", | |||
new_name_ptr, errno); | new_name_ptr, errno); | |||
} | } | |||
mysql_mutex_unlock(&LOCK_index); | mysql_mutex_unlock(&LOCK_index); | |||
mysql_mutex_unlock(&LOCK_commit); | ||||
if (need_lock_log) | if (need_lock_log) | |||
mysql_mutex_unlock(&LOCK_log); | mysql_mutex_unlock(&LOCK_log); | |||
DBUG_RETURN(error); | DBUG_RETURN(error); | |||
} | } | |||
#ifdef HAVE_REPLICATION | #ifdef HAVE_REPLICATION | |||
/** | /** | |||
Called after an event has been written to the relay log by the IO | Called after an event has been written to the relay log by the IO | |||
thread. This flushes and possibly syncs the file (according to the | thread. This flushes and possibly syncs the file (according to the | |||
skipping to change at line 4981 | skipping to change at line 5115 | |||
@retval | @retval | |||
nonzero - error in rotating routine. | nonzero - error in rotating routine. | |||
*/ | */ | |||
void MYSQL_BIN_LOG::purge() | void MYSQL_BIN_LOG::purge() | |||
{ | { | |||
#ifdef HAVE_REPLICATION | #ifdef HAVE_REPLICATION | |||
if (expire_logs_days) | if (expire_logs_days) | |||
{ | { | |||
DEBUG_SYNC(current_thd, "at_purge_logs_before_date"); | DEBUG_SYNC(current_thd, "at_purge_logs_before_date"); | |||
time_t purge_time= my_time(0) - expire_logs_days*24*60*60; | time_t purge_time= my_time(0) - expire_logs_days*24*60*60; | |||
DBUG_EXECUTE_IF("expire_logs_always", | ||||
{ purge_time= my_time(0);}); | ||||
if (purge_time >= 0) | if (purge_time >= 0) | |||
{ | { | |||
purge_logs_before_date(purge_time); | /* | |||
Flush logs for storage engines, so that the last transaction | ||||
is fsynced inside storage engines. | ||||
*/ | ||||
ha_flush_logs(NULL); | ||||
purge_logs_before_date(purge_time, true); | ||||
} | } | |||
} | } | |||
#endif | #endif | |||
} | } | |||
/** | /** | |||
The method is a shortcut of @c rotate() and @c purge(). | The method is a shortcut of @c rotate() and @c purge(). | |||
LOCK_log is acquired prior to rotate and is released after it. | LOCK_log is acquired prior to rotate and is released after it. | |||
@param force_rotate caller can request the log rotation | @param force_rotate caller can request the log rotation | |||
skipping to change at line 5406 | skipping to change at line 5547 | |||
'cache' needs to be reinitialized after this functions returns. | 'cache' needs to be reinitialized after this functions returns. | |||
*/ | */ | |||
bool MYSQL_BIN_LOG::write_cache(THD *thd, binlog_cache_data *cache_data) | bool MYSQL_BIN_LOG::write_cache(THD *thd, binlog_cache_data *cache_data) | |||
{ | { | |||
DBUG_ENTER("MYSQL_BIN_LOG::write_cache(THD *, binlog_cache_data *, bool)" ); | DBUG_ENTER("MYSQL_BIN_LOG::write_cache(THD *, binlog_cache_data *, bool)" ); | |||
IO_CACHE *cache= &cache_data->cache_log; | IO_CACHE *cache= &cache_data->cache_log; | |||
bool incident= cache_data->has_incident(); | bool incident= cache_data->has_incident(); | |||
DBUG_EXECUTE_IF("simulate_binlog_flush_error", | ||||
{ | ||||
if (rand() % 3 == 0) | ||||
{ | ||||
write_error=1; | ||||
goto err; | ||||
} | ||||
};); | ||||
mysql_mutex_assert_owner(&LOCK_log); | mysql_mutex_assert_owner(&LOCK_log); | |||
DBUG_ASSERT(is_open()); | DBUG_ASSERT(is_open()); | |||
if (likely(is_open())) // Should always be true | if (likely(is_open())) // Should always be true | |||
{ | { | |||
/* | /* | |||
We only bother to write to the binary log if there is anything | We only bother to write to the binary log if there is anything | |||
to write. | to write. | |||
*/ | */ | |||
if (my_b_tell(cache) > 0) | if (my_b_tell(cache) > 0) | |||
skipping to change at line 5466 | skipping to change at line 5616 | |||
DBUG_RETURN(0); | DBUG_RETURN(0); | |||
err: | err: | |||
if (!write_error) | if (!write_error) | |||
{ | { | |||
char errbuf[MYSYS_STRERROR_SIZE]; | char errbuf[MYSYS_STRERROR_SIZE]; | |||
write_error= 1; | write_error= 1; | |||
sql_print_error(ER(ER_ERROR_ON_WRITE), name, | sql_print_error(ER(ER_ERROR_ON_WRITE), name, | |||
errno, my_strerror(errbuf, sizeof(errbuf), errno)); | errno, my_strerror(errbuf, sizeof(errbuf), errno)); | |||
} | } | |||
thd->commit_error= THD::CE_FLUSH_ERROR; | ||||
DBUG_RETURN(1); | DBUG_RETURN(1); | |||
} | } | |||
/** | /** | |||
Wait until we get a signal that the relay log has been updated. | Wait until we get a signal that the relay log has been updated. | |||
@param[in] thd Thread variable | @param[in] thd Thread variable | |||
@param[in] timeout a pointer to a timespec; | @param[in] timeout a pointer to a timespec; | |||
NULL means to wait w/o timeout. | NULL means to wait w/o timeout. | |||
skipping to change at line 6016 | skipping to change at line 6168 | |||
bool wrote_xid= false; | bool wrote_xid= false; | |||
int error= cache_mngr->flush(thd, &bytes, &wrote_xid); | int error= cache_mngr->flush(thd, &bytes, &wrote_xid); | |||
if (!error && bytes > 0) | if (!error && bytes > 0) | |||
{ | { | |||
/* | /* | |||
Note that set_trans_pos does not copy the file name. See | Note that set_trans_pos does not copy the file name. See | |||
this function documentation for more info. | this function documentation for more info. | |||
*/ | */ | |||
thd->set_trans_pos(log_file_name, my_b_tell(&log_file)); | thd->set_trans_pos(log_file_name, my_b_tell(&log_file)); | |||
if (wrote_xid) | if (wrote_xid) | |||
{ | inc_prep_xids(thd); | |||
inc_prep_xids(); | ||||
thd->transaction.flags.xid_written= true; | ||||
} | ||||
} | } | |||
DBUG_PRINT("debug", ("bytes: %llu", bytes)); | DBUG_PRINT("debug", ("bytes: %llu", bytes)); | |||
return std::make_pair(error, bytes); | return std::make_pair(error, bytes); | |||
} | } | |||
/** | /** | |||
Execute the flush stage. | Execute the flush stage. | |||
@param total_bytes_var Pointer to variable that will be set to total | @param total_bytes_var Pointer to variable that will be set to total | |||
number of bytes flushed, or NULL. | number of bytes flushed, or NULL. | |||
skipping to change at line 6045 | skipping to change at line 6194 | |||
@return Error code on error, zero on success | @return Error code on error, zero on success | |||
*/ | */ | |||
int | int | |||
MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var, | MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var, | |||
bool *rotate_var, | bool *rotate_var, | |||
THD **out_queue_var) | THD **out_queue_var) | |||
{ | { | |||
DBUG_ASSERT(total_bytes_var && rotate_var && out_queue_var); | DBUG_ASSERT(total_bytes_var && rotate_var && out_queue_var); | |||
my_off_t total_bytes= 0; | my_off_t total_bytes= 0; | |||
int flush_error= 0; | int flush_error= 1; | |||
mysql_mutex_assert_owner(&LOCK_log); | mysql_mutex_assert_owner(&LOCK_log); | |||
my_atomic_rwlock_rdlock(&opt_binlog_max_flush_queue_time_lock); | my_atomic_rwlock_rdlock(&opt_binlog_max_flush_queue_time_lock); | |||
const ulonglong max_udelay= my_atomic_load32(&opt_binlog_max_flush_queue_ time); | const ulonglong max_udelay= my_atomic_load32(&opt_binlog_max_flush_queue_ time); | |||
my_atomic_rwlock_rdunlock(&opt_binlog_max_flush_queue_time_lock); | my_atomic_rwlock_rdunlock(&opt_binlog_max_flush_queue_time_lock); | |||
const ulonglong start_utime= max_udelay > 0 ? my_micro_time() : 0; | const ulonglong start_utime= max_udelay > 0 ? my_micro_time() : 0; | |||
/* | /* | |||
First we read the queue until it either is empty or the difference | First we read the queue until it either is empty or the difference | |||
between the time we started and the current time is too large. | between the time we started and the current time is too large. | |||
skipping to change at line 6068 | skipping to change at line 6217 | |||
beginning of the out queue. | beginning of the out queue. | |||
*/ | */ | |||
bool has_more= true; | bool has_more= true; | |||
THD *first_seen= NULL; | THD *first_seen= NULL; | |||
while ((max_udelay == 0 || my_micro_time() < start_utime + max_udelay) && has_more) | while ((max_udelay == 0 || my_micro_time() < start_utime + max_udelay) && has_more) | |||
{ | { | |||
std::pair<bool,THD*> current= stage_manager.pop_front(Stage_manager::FL USH_STAGE); | std::pair<bool,THD*> current= stage_manager.pop_front(Stage_manager::FL USH_STAGE); | |||
std::pair<int,my_off_t> result= flush_thread_caches(current.second); | std::pair<int,my_off_t> result= flush_thread_caches(current.second); | |||
has_more= current.first; | has_more= current.first; | |||
total_bytes+= result.second; | total_bytes+= result.second; | |||
if (flush_error == 0) | if (flush_error == 1) | |||
flush_error= result.first; | flush_error= result.first; | |||
if (first_seen == NULL) | if (first_seen == NULL) | |||
first_seen= current.second; | first_seen= current.second; | |||
} | } | |||
/* | /* | |||
Either the queue is empty, or we ran out of time. If we ran out of | Either the queue is empty, or we ran out of time. If we ran out of | |||
time, we have to fetch the entire queue (and flush it) since | time, we have to fetch the entire queue (and flush it) since | |||
otherwise the next batch will not have a leader. | otherwise the next batch will not have a leader. | |||
*/ | */ | |||
if (has_more) | if (has_more) | |||
{ | { | |||
THD *queue= stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE); | THD *queue= stage_manager.fetch_queue_for(Stage_manager::FLUSH_STAGE); | |||
for (THD *head= queue ; head ; head = head->next_to_commit) | for (THD *head= queue ; head ; head = head->next_to_commit) | |||
{ | { | |||
std::pair<int,my_off_t> result= flush_thread_caches(head); | std::pair<int,my_off_t> result= flush_thread_caches(head); | |||
total_bytes+= result.second; | total_bytes+= result.second; | |||
if (flush_error == 0) | if (flush_error == 1) | |||
flush_error= result.first; | flush_error= result.first; | |||
} | } | |||
if (first_seen == NULL) | if (first_seen == NULL) | |||
first_seen= queue; | first_seen= queue; | |||
} | } | |||
*out_queue_var= first_seen; | *out_queue_var= first_seen; | |||
*total_bytes_var= total_bytes; | *total_bytes_var= total_bytes; | |||
if (total_bytes > 0 && my_b_tell(&log_file) >= (my_off_t) max_size) | if (total_bytes > 0 && my_b_tell(&log_file) >= (my_off_t) max_size) | |||
*rotate_var= true; | *rotate_var= true; | |||
skipping to change at line 6112 | skipping to change at line 6261 | |||
This function commit an entire queue of sessions starting with the | This function commit an entire queue of sessions starting with the | |||
session in @c first. If there were an error in the flushing part of | session in @c first. If there were an error in the flushing part of | |||
the ordered commit, the error code is passed in and all the threads | the ordered commit, the error code is passed in and all the threads | |||
are marked accordingly (but not committed). | are marked accordingly (but not committed). | |||
@see MYSQL_BIN_LOG::ordered_commit | @see MYSQL_BIN_LOG::ordered_commit | |||
@param thd The "master" thread | @param thd The "master" thread | |||
@param first First thread in the queue of threads to commit | @param first First thread in the queue of threads to commit | |||
@param flush_error Error code from flush operation. | ||||
*/ | */ | |||
void | void | |||
MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first, | MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first) | |||
int flush_error) | ||||
{ | { | |||
mysql_mutex_assert_owner(&LOCK_commit); | mysql_mutex_assert_owner(&LOCK_commit); | |||
Thread_excursion excursion(thd); | Thread_excursion excursion(thd); | |||
#ifndef DBUG_OFF | #ifndef DBUG_OFF | |||
thd->transaction.flags.ready_preempt= 1; // formality by the leader | thd->transaction.flags.ready_preempt= 1; // formality by the leader | |||
#endif | #endif | |||
for (THD *head= first ; head ; head = head->next_to_commit) | for (THD *head= first ; head ; head = head->next_to_commit) | |||
{ | { | |||
DBUG_PRINT("debug", ("Thread ID: %lu, commit_error: %d, flags.pending: %s", | DBUG_PRINT("debug", ("Thread ID: %lu, commit_error: %d, flags.pending: %s", | |||
head->thread_id, head->commit_error, | head->thread_id, head->commit_error, | |||
skipping to change at line 6140 | skipping to change at line 6287 | |||
If flushing failed, set commit_error for the session, skip the | If flushing failed, set commit_error for the session, skip the | |||
transaction and proceed with the next transaction instead. This | transaction and proceed with the next transaction instead. This | |||
will mark all threads as failed, since the flush failed. | will mark all threads as failed, since the flush failed. | |||
If flush succeeded, attach to the session and commit it in the | If flush succeeded, attach to the session and commit it in the | |||
engines. | engines. | |||
*/ | */ | |||
#ifndef DBUG_OFF | #ifndef DBUG_OFF | |||
stage_manager.clear_preempt_status(head); | stage_manager.clear_preempt_status(head); | |||
#endif | #endif | |||
if (flush_error != 0) | if (head->commit_error != THD::CE_NONE) | |||
head->commit_error= flush_error; | ; | |||
else if (int error= excursion.attach_to(head)) | else if (excursion.attach_to(head)) | |||
head->commit_error= error; | { | |||
head->commit_error= THD::CE_COMMIT_ERROR; | ||||
sql_print_error("Out of memory while attaching to session thread " | ||||
"during the group commit phase."); | ||||
} | ||||
else | else | |||
{ | { | |||
bool all= head->transaction.flags.real_commit; | bool all= head->transaction.flags.real_commit; | |||
if (head->transaction.flags.commit_low) | if (head->transaction.flags.commit_low) | |||
{ | { | |||
/* head is parked to have exited append() */ | /* head is parked to have exited append() */ | |||
DBUG_ASSERT(head->transaction.flags.ready_preempt); | DBUG_ASSERT(head->transaction.flags.ready_preempt); | |||
/* | ||||
if (int error= ha_commit_low(head, all)) | storage engine commit | |||
head->commit_error= error; | */ | |||
else if (head->transaction.flags.xid_written) | if (ha_commit_low(head, all, false)) | |||
dec_prep_xids(); | head->commit_error= THD::CE_COMMIT_ERROR; | |||
} | } | |||
DBUG_PRINT("debug", ("commit_error: %d, flags.pending: %s", | DBUG_PRINT("debug", ("commit_error: %d, flags.pending: %s", | |||
head->commit_error, | head->commit_error, | |||
YESNO(head->transaction.flags.pending))); | YESNO(head->transaction.flags.pending))); | |||
} | } | |||
/* | ||||
Decrement the prepared XID counter after storage engine commit. | ||||
We also need decrement the prepared XID when encountering a | ||||
flush error or session attach error for avoiding 3-way deadlock | ||||
among user thread, rotate thread and dump thread. | ||||
*/ | ||||
if (head->transaction.flags.xid_written) | ||||
dec_prep_xids(head); | ||||
} | ||||
} | ||||
/** | ||||
Process after commit for a sequence of sessions. | ||||
@param thd The "master" thread | ||||
@param first First thread in the queue of threads to commit | ||||
*/ | ||||
void | ||||
MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first) | ||||
{ | ||||
Thread_excursion excursion(thd); | ||||
for (THD *head= first; head; head= head->next_to_commit) | ||||
{ | ||||
if (head->transaction.flags.run_hooks && | ||||
head->commit_error == THD::CE_NONE) | ||||
{ | ||||
if (excursion.attach_to(head)) | ||||
{ | ||||
head->commit_error= THD::CE_COMMIT_ERROR; | ||||
sql_print_error("Out of memory while attaching to session thread " | ||||
"during the group commit phase."); | ||||
} | ||||
if (head->commit_error == THD::CE_NONE) | ||||
{ | ||||
bool all= head->transaction.flags.real_commit; | ||||
(void) RUN_HOOK(transaction, after_commit, (head, all)); | ||||
/* | ||||
When after_commit finished for the transaction, clear the run_hoo | ||||
ks flag. | ||||
This allow other parts of the system to check if after_commit was | ||||
called. | ||||
*/ | ||||
head->transaction.flags.run_hooks= false; | ||||
} | ||||
} | ||||
} | } | |||
} | } | |||
#ifndef DBUG_OFF | #ifndef DBUG_OFF | |||
/** Names for the stages. */ | /** Names for the stages. */ | |||
static const char* g_stage_name[] = { | static const char* g_stage_name[] = { | |||
"FLUSH", | "FLUSH", | |||
"SYNC", | "SYNC", | |||
"COMMIT", | "COMMIT", | |||
}; | }; | |||
skipping to change at line 6284 | skipping to change at line 6479 | |||
if (enter_stage(thd, Thread_queue::FLUSH_STAGE, thd, &LOCK_log)) | if (enter_stage(thd, Thread_queue::FLUSH_STAGE, thd, &LOCK_log)) | |||
return finish_commit(thd); | return finish_commit(thd); | |||
@endcode | @endcode | |||
@return Error code if the session commit failed, or zero on | @return Error code if the session commit failed, or zero on | |||
success. | success. | |||
*/ | */ | |||
int | int | |||
MYSQL_BIN_LOG::finish_commit(THD *thd) | MYSQL_BIN_LOG::finish_commit(THD *thd) | |||
{ | { | |||
if (thd->commit_error == 0 && thd->transaction.flags.commit_low) | if (thd->transaction.flags.commit_low) | |||
{ | { | |||
const bool all= thd->transaction.flags.real_commit; | const bool all= thd->transaction.flags.real_commit; | |||
thd->commit_error= ha_commit_low(thd, all); | /* | |||
storage engine commit | ||||
*/ | ||||
if (thd->commit_error == THD::CE_NONE && | ||||
ha_commit_low(thd, all, false)) | ||||
thd->commit_error= THD::CE_COMMIT_ERROR; | ||||
/* | ||||
Decrement the prepared XID counter after storage engine commit | ||||
*/ | ||||
if (thd->transaction.flags.xid_written) | if (thd->transaction.flags.xid_written) | |||
dec_prep_xids(); | dec_prep_xids(thd); | |||
/* | ||||
If commit succeeded, we call the after_commit hook | ||||
*/ | ||||
if (thd->commit_error == THD::CE_NONE) | ||||
{ | ||||
(void) RUN_HOOK(transaction, after_commit, (thd, all)); | ||||
thd->transaction.flags.run_hooks= false; | ||||
} | ||||
} | } | |||
else if (thd->transaction.flags.xid_written) | ||||
dec_prep_xids(thd); | ||||
thd->variables.gtid_next.set_undefined(); | thd->variables.gtid_next.set_undefined(); | |||
/* | /* | |||
Remove committed GTID from owned_gtids, it was already logged on | Remove committed GTID from owned_gtids, it was already logged on | |||
MYSQL_BIN_LOG::write_cache(). | MYSQL_BIN_LOG::write_cache(). | |||
*/ | */ | |||
global_sid_lock->rdlock(); | global_sid_lock->rdlock(); | |||
gtid_state->update_on_commit(thd); | gtid_state->update_on_commit(thd); | |||
global_sid_lock->unlock(); | global_sid_lock->unlock(); | |||
DBUG_ASSERT(thd->commit_error || !thd->transaction.flags.commit_low); | DBUG_ASSERT(thd->commit_error || !thd->transaction.flags.run_hooks); | |||
DBUG_ASSERT(!thd_get_cache_mngr(thd)->dbug_any_finalized()); | DBUG_ASSERT(!thd_get_cache_mngr(thd)->dbug_any_finalized()); | |||
DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", | DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", | |||
thd->thread_id, thd->commit_error)); | thd->thread_id, thd->commit_error)); | |||
return thd->commit_error; | return thd->commit_error; | |||
} | } | |||
/** | /** | |||
Flush and commit the transaction. | Flush and commit the transaction. | |||
This will execute an ordered flush and commit of all outstanding | This will execute an ordered flush and commit of all outstanding | |||
skipping to change at line 6378 | skipping to change at line 6591 | |||
Notes: | Notes: | |||
- It would be good if we could keep transaction coordinator | - It would be good if we could keep transaction coordinator | |||
log-specific data out of the THD structure, but that is not the | log-specific data out of the THD structure, but that is not the | |||
case right now. | case right now. | |||
- Everything in the transaction structure is reset when calling | - Everything in the transaction structure is reset when calling | |||
ha_commit_low since that calls st_transaction::cleanup. | ha_commit_low since that calls st_transaction::cleanup. | |||
*/ | */ | |||
thd->transaction.flags.pending= true; | thd->transaction.flags.pending= true; | |||
thd->commit_error= 0; | thd->commit_error= THD::CE_NONE; | |||
thd->next_to_commit= NULL; | thd->next_to_commit= NULL; | |||
thd->durability_property= HA_IGNORE_DURABILITY; | thd->durability_property= HA_IGNORE_DURABILITY; | |||
thd->transaction.flags.real_commit= all; | thd->transaction.flags.real_commit= all; | |||
thd->transaction.flags.xid_written= false; | thd->transaction.flags.xid_written= false; | |||
thd->transaction.flags.commit_low= !skip_commit; | thd->transaction.flags.commit_low= !skip_commit; | |||
thd->transaction.flags.run_hooks= !skip_commit; | ||||
#ifndef DBUG_OFF | #ifndef DBUG_OFF | |||
/* | /* | |||
The group commit Leader may have to wait for follower whose transactio n | The group commit Leader may have to wait for follower whose transactio n | |||
is not ready to be preempted. Initially the status is pessimistic. | is not ready to be preempted. Initially the status is pessimistic. | |||
Preemption guarding logics is necessary only when DBUG_ON is set. | Preemption guarding logics is necessary only when DBUG_ON is set. | |||
It won't be required for the dbug-off case as long as the follower won 't | It won't be required for the dbug-off case as long as the follower won 't | |||
execute any thread-specific write access code in this method, which is | execute any thread-specific write access code in this method, which is | |||
the case as of current. | the case as of current. | |||
*/ | */ | |||
thd->transaction.flags.ready_preempt= 0; | thd->transaction.flags.ready_preempt= 0; | |||
skipping to change at line 6478 | skipping to change at line 6692 | |||
if (opt_binlog_order_commits) | if (opt_binlog_order_commits) | |||
{ | { | |||
if (change_stage(thd, Stage_manager::COMMIT_STAGE, | if (change_stage(thd, Stage_manager::COMMIT_STAGE, | |||
final_queue, &LOCK_sync, &LOCK_commit)) | final_queue, &LOCK_sync, &LOCK_commit)) | |||
{ | { | |||
DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", | DBUG_PRINT("return", ("Thread ID: %lu, commit_error: %d", | |||
thd->thread_id, thd->commit_error)); | thd->thread_id, thd->commit_error)); | |||
DBUG_RETURN(finish_commit(thd)); | DBUG_RETURN(finish_commit(thd)); | |||
} | } | |||
THD *commit_queue= stage_manager.fetch_queue_for(Stage_manager::COMMIT_ STAGE); | THD *commit_queue= stage_manager.fetch_queue_for(Stage_manager::COMMIT_ STAGE); | |||
process_commit_stage_queue(thd, commit_queue, flush_error); | DBUG_EXECUTE_IF("semi_sync_3-way_deadlock", | |||
DEBUG_SYNC(thd, "before_process_commit_stage_queue");); | ||||
process_commit_stage_queue(thd, commit_queue); | ||||
mysql_mutex_unlock(&LOCK_commit); | mysql_mutex_unlock(&LOCK_commit); | |||
/* | ||||
Process after_commit after LOCK_commit is released for avoiding | ||||
3-way deadlock among user thread, rotate thread and dump thread. | ||||
*/ | ||||
process_after_commit_stage_queue(thd, commit_queue); | ||||
final_queue= commit_queue; | final_queue= commit_queue; | |||
} | } | |||
else | else | |||
mysql_mutex_unlock(&LOCK_sync); | mysql_mutex_unlock(&LOCK_sync); | |||
/* Commit done so signal all waiting threads */ | /* Commit done so signal all waiting threads */ | |||
stage_manager.signal_done(final_queue); | stage_manager.signal_done(final_queue); | |||
/* | /* | |||
Finish the commit before executing a rotate, or run the risk of a | Finish the commit before executing a rotate, or run the risk of a | |||
deadlock. We don't need the return value here since it is in | deadlock. We don't need the return value here since it is in | |||
thd->commit_error, which is returned below. | thd->commit_error, which is returned below. | |||
*/ | */ | |||
(void) finish_commit(thd); | (void) finish_commit(thd); | |||
/* | /* | |||
If we need to rotate, we do it now. | If we need to rotate, we do it without commit error. | |||
Otherwise the thd->commit_error will be possibly reset. | ||||
*/ | */ | |||
if (do_rotate) | if (do_rotate && thd->commit_error == THD::CE_NONE) | |||
{ | { | |||
/* | /* | |||
We can force the rotate since we did the check in | Do not force the rotate as several consecutive groups may | |||
flush_session_queue(). Giving "false" would have the same | request unnecessary rotations. | |||
result, but will do the check again. | ||||
NOTE: Run purge_logs wo/ holding LOCK_log because it does not | NOTE: Run purge_logs wo/ holding LOCK_log because it does not | |||
need the mutex. Otherwise causes various deadlocks. | need the mutex. Otherwise causes various deadlocks. | |||
NOTE: The LOCK_commit is necessary when doing a rotate, but that | ||||
is grabbed inside new_file_impl(). | ||||
*/ | */ | |||
DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE();); | DBUG_EXECUTE_IF("crash_commit_before_unlog", DBUG_SUICIDE();); | |||
DEBUG_SYNC(thd, "ready_to_do_rotation"); | ||||
bool check_purge= false; | bool check_purge= false; | |||
mysql_mutex_lock(&LOCK_log); | mysql_mutex_lock(&LOCK_log); | |||
int error= rotate(true, &check_purge); | int error= rotate(false, &check_purge); | |||
mysql_mutex_unlock(&LOCK_log); | mysql_mutex_unlock(&LOCK_log); | |||
if (!error && check_purge) | if (!error && check_purge) | |||
purge(); | purge(); | |||
else | else | |||
thd->commit_error= error; | thd->commit_error= THD::CE_COMMIT_ERROR; | |||
} | } | |||
DBUG_RETURN(thd->commit_error); | DBUG_RETURN(thd->commit_error); | |||
} | } | |||
/** | /** | |||
MYSQLD server recovers from last crashed binlog. | MYSQLD server recovers from last crashed binlog. | |||
@param log IO_CACHE of the crashed binlog. | @param log IO_CACHE of the crashed binlog. | |||
@param fdle Format_description_log_event of the crashed binlog. | @param fdle Format_description_log_event of the crashed binlog. | |||
@param valid_pos The position of the last valid transaction or | @param valid_pos The position of the last valid transaction or | |||
skipping to change at line 7469 | skipping to change at line 7688 | |||
add_to_binlog_accessed_dbs(""); | add_to_binlog_accessed_dbs(""); | |||
break; | break; | |||
} | } | |||
if (!is_current_stmt_binlog_format_row()) | if (!is_current_stmt_binlog_format_row()) | |||
add_to_binlog_accessed_dbs(table->db); | add_to_binlog_accessed_dbs(table->db); | |||
} | } | |||
} | } | |||
DBUG_PRINT("info", ("decision: logging in %s format", | DBUG_PRINT("info", ("decision: logging in %s format", | |||
is_current_stmt_binlog_format_row() ? | is_current_stmt_binlog_format_row() ? | |||
"ROW" : "STATEMENT")); | "ROW" : "STATEMENT")); | |||
if (variables.binlog_format == BINLOG_FORMAT_ROW && | ||||
(lex->sql_command == SQLCOM_UPDATE || | ||||
lex->sql_command == SQLCOM_UPDATE_MULTI || | ||||
lex->sql_command == SQLCOM_DELETE || | ||||
lex->sql_command == SQLCOM_DELETE_MULTI)) | ||||
{ | ||||
String table_names; | ||||
/* | ||||
Generate a warning for UPDATE/DELETE statements that modify a | ||||
BLACKHOLE table, as row events are not logged in row format. | ||||
*/ | ||||
for (TABLE_LIST *table= tables; table; table= table->next_global) | ||||
{ | ||||
if (table->placeholder()) | ||||
continue; | ||||
if (table->table->file->ht->db_type == DB_TYPE_BLACKHOLE_DB && | ||||
table->lock_type >= TL_WRITE_ALLOW_WRITE) | ||||
{ | ||||
table_names.append(table->table_name); | ||||
table_names.append(","); | ||||
} | ||||
} | ||||
if (!table_names.is_empty()) | ||||
{ | ||||
bool is_update= (lex->sql_command == SQLCOM_UPDATE || | ||||
lex->sql_command == SQLCOM_UPDATE_MULTI); | ||||
/* | ||||
Replace the last ',' with '.' for table_names | ||||
*/ | ||||
table_names.replace(table_names.length()-1, 1, ".", 1); | ||||
push_warning_printf(this, Sql_condition::WARN_LEVEL_WARN, | ||||
WARN_ON_BLOCKHOLE_IN_RBR, | ||||
ER(WARN_ON_BLOCKHOLE_IN_RBR), | ||||
is_update ? "UPDATE" : "DELETE", | ||||
table_names.c_ptr()); | ||||
} | ||||
} | ||||
} | } | |||
#ifndef DBUG_OFF | #ifndef DBUG_OFF | |||
else | else | |||
DBUG_PRINT("info", ("decision: no logging since " | DBUG_PRINT("info", ("decision: no logging since " | |||
"mysql_bin_log.is_open() = %d " | "mysql_bin_log.is_open() = %d " | |||
"and (options & OPTION_BIN_LOG) = 0x%llx " | "and (options & OPTION_BIN_LOG) = 0x%llx " | |||
"and binlog_format = %lu " | "and binlog_format = %lu " | |||
"and binlog_filter->db_ok(db) = %d", | "and binlog_filter->db_ok(db) = %d", | |||
mysql_bin_log.is_open(), | mysql_bin_log.is_open(), | |||
(variables.option_bits & OPTION_BIN_LOG), | (variables.option_bits & OPTION_BIN_LOG), | |||
skipping to change at line 7943 | skipping to change at line 8200 | |||
/** | /** | |||
Remove from read_set spurious columns. The write_set has been | Remove from read_set spurious columns. The write_set has been | |||
handled before in table->mark_columns_needed_for_update. | handled before in table->mark_columns_needed_for_update. | |||
*/ | */ | |||
DBUG_PRINT_BITSET("debug", "table->read_set (before preparing): %s", tabl e->read_set); | DBUG_PRINT_BITSET("debug", "table->read_set (before preparing): %s", tabl e->read_set); | |||
THD *thd= table->in_use; | THD *thd= table->in_use; | |||
/** | /** | |||
if there is a primary key in the table (ie, user declared PK or a | if there is a primary key in the table (ie, user declared PK or a | |||
non-null unique index) and we dont want to ship the entire image. | non-null unique index) and we dont want to ship the entire image, | |||
and the handler involved supports this. | ||||
*/ | */ | |||
if (table->s->primary_key < MAX_KEY && | if (table->s->primary_key < MAX_KEY && | |||
(thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL)) | (thd->variables.binlog_row_image < BINLOG_ROW_IMAGE_FULL) && | |||
!ha_check_storage_engine_flag(table->s->db_type(), HTON_NO_BINLOG_ROW | ||||
_OPT)) | ||||
{ | { | |||
/** | /** | |||
Just to be sure that tmp_set is currently not in use as | Just to be sure that tmp_set is currently not in use as | |||
the read_set already. | the read_set already. | |||
*/ | */ | |||
DBUG_ASSERT(table->read_set != &table->tmp_set); | DBUG_ASSERT(table->read_set != &table->tmp_set); | |||
bitmap_clear_all(&table->tmp_set); | bitmap_clear_all(&table->tmp_set); | |||
switch(thd->variables.binlog_row_image) | switch(thd->variables.binlog_row_image) | |||
End of changes. 74 change blocks. | ||||
73 lines changed or deleted | 341 lines changed or added | |||
This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/ |